HTTP transport implementation
This commit is contained in:
parent
0bb26f245f
commit
823b6aa7a7
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
|||||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||||
import org.thingsboard.server.common.transport.TransportService;
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
|
import org.thingsboard.server.common.transport.service.AbstractTransportService;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||||
@ -56,6 +57,8 @@ import java.util.UUID;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -67,11 +70,7 @@ import java.util.function.Consumer;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
|
@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
|
||||||
public class LocalTransportService implements TransportService, RuleEngineTransportService {
|
public class LocalTransportService extends AbstractTransportService implements RuleEngineTransportService {
|
||||||
|
|
||||||
private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private ExecutorService transportCallbackExecutor;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TransportApiService transportApiService;
|
private TransportApiService transportApiService;
|
||||||
@ -89,14 +88,12 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
|
super.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
if (transportCallbackExecutor != null) {
|
super.destroy();
|
||||||
transportCallbackExecutor.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -175,21 +172,6 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
|
|||||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
|
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
|
|
||||||
sessions.putIfAbsent(toId(sessionInfo), listener);
|
|
||||||
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deregisterSession(SessionInfoProto sessionInfo) {
|
|
||||||
sessions.remove(toId(sessionInfo));
|
|
||||||
}
|
|
||||||
|
|
||||||
private UUID toId(SessionInfoProto sessionInfo) {
|
|
||||||
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(String nodeId, DeviceActorToTransportMsg msg) {
|
public void process(String nodeId, DeviceActorToTransportMsg msg) {
|
||||||
process(nodeId, msg, null, null);
|
process(nodeId, msg, null, null);
|
||||||
@ -197,30 +179,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||||
UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
|
processToTransportMsg(msg);
|
||||||
SessionMsgListener listener = sessions.get(sessionId);
|
|
||||||
if (listener != null) {
|
|
||||||
transportCallbackExecutor.submit(() -> {
|
|
||||||
if (msg.hasGetAttributesResponse()) {
|
|
||||||
listener.onGetAttributesResponse(msg.getGetAttributesResponse());
|
|
||||||
}
|
|
||||||
if (msg.hasAttributeUpdateNotification()) {
|
|
||||||
listener.onAttributeUpdate(msg.getAttributeUpdateNotification());
|
|
||||||
}
|
|
||||||
if (msg.hasSessionCloseNotification()) {
|
|
||||||
listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification());
|
|
||||||
}
|
|
||||||
if (msg.hasToDeviceRequest()) {
|
|
||||||
listener.onToDeviceRpcRequest(msg.getToDeviceRequest());
|
|
||||||
}
|
|
||||||
if (msg.hasToServerResponse()) {
|
|
||||||
listener.onToServerRpcResponse(msg.getToServerResponse());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
//TODO: should we notify the device actor about missed session?
|
|
||||||
log.debug("[{}] Missing session.", sessionId);
|
|
||||||
}
|
|
||||||
if (onSuccess != null) {
|
if (onSuccess != null) {
|
||||||
onSuccess.run();
|
onSuccess.run();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -439,7 +439,7 @@ transport:
|
|||||||
# Local MQTT transport parameters
|
# Local MQTT transport parameters
|
||||||
mqtt:
|
mqtt:
|
||||||
# Enable/disable mqtt transport protocol.
|
# Enable/disable mqtt transport protocol.
|
||||||
enabled: "${MQTT_ENABLED:true}"
|
enabled: "${MQTT_ENABLED:false}"
|
||||||
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
|
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
|
||||||
bind_port: "${MQTT_BIND_PORT:1883}"
|
bind_port: "${MQTT_BIND_PORT:1883}"
|
||||||
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
|
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.transport.http;
|
package org.thingsboard.server.transport.http;
|
||||||
|
|
||||||
|
import com.google.gson.JsonObject;
|
||||||
|
import com.google.gson.JsonParser;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
@ -33,9 +35,20 @@ import org.thingsboard.server.common.transport.TransportContext;
|
|||||||
import org.thingsboard.server.common.transport.TransportService;
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.*;
|
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
|
||||||
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
|
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -76,7 +89,7 @@ public class DeviceApiController {
|
|||||||
request.addAllSharedAttributeNames(sharedKeySet);
|
request.addAllSharedAttributeNames(sharedKeySet);
|
||||||
}
|
}
|
||||||
TransportService transportService = transportContext.getTransportService();
|
TransportService transportService = transportContext.getTransportService();
|
||||||
transportService.registerSession(sessionInfo, TransportProtos.SessionType.SYNC, new HttpSessionListener(transportContext, responseWriter));
|
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
|
||||||
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||||
}));
|
}));
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
@ -85,20 +98,16 @@ public class DeviceApiController {
|
|||||||
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
|
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
|
||||||
public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
|
public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
|
||||||
@RequestBody String json, HttpServletRequest request) {
|
@RequestBody String json, HttpServletRequest request) {
|
||||||
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
|
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
|
||||||
if (quotaExceeded(request, responseWriter)) {
|
if (quotaExceeded(request, responseWriter)) {
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
// try {
|
TransportService transportService = transportContext.getTransportService();
|
||||||
// process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
|
transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
|
||||||
// } catch (IllegalStateException | JsonSyntaxException ex) {
|
new HttpOkCallback(responseWriter));
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
}));
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
|
||||||
// }
|
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,26 +118,33 @@ public class DeviceApiController {
|
|||||||
if (quotaExceeded(request, responseWriter)) {
|
if (quotaExceeded(request, responseWriter)) {
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
// try {
|
TransportService transportService = transportContext.getTransportService();
|
||||||
// process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
|
transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
|
||||||
// } catch (IllegalStateException | JsonSyntaxException ex) {
|
new HttpOkCallback(responseWriter));
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
}));
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
|
||||||
// }
|
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
|
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
|
||||||
public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
|
public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
|
||||||
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
|
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
|
||||||
HttpServletRequest request) {
|
HttpServletRequest httpRequest) {
|
||||||
|
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
|
||||||
|
if (quotaExceeded(httpRequest, responseWriter)) {
|
||||||
|
return responseWriter;
|
||||||
|
}
|
||||||
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
|
TransportService transportService = transportContext.getTransportService();
|
||||||
|
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
|
||||||
|
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
|
||||||
|
transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
|
||||||
|
new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||||
|
|
||||||
// return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
|
}));
|
||||||
return null;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
|
@RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
|
||||||
@ -139,17 +155,11 @@ public class DeviceApiController {
|
|||||||
if (quotaExceeded(request, responseWriter)) {
|
if (quotaExceeded(request, responseWriter)) {
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
// try {
|
TransportService transportService = transportContext.getTransportService();
|
||||||
// JsonObject response = new JsonParser().parse(json).getAsJsonObject();
|
transportService.process(sessionInfo, ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(json).build(), new HttpOkCallback(responseWriter));
|
||||||
// process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
|
}));
|
||||||
// } catch (IllegalStateException | JsonSyntaxException ex) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
|
||||||
// }
|
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,19 +170,16 @@ public class DeviceApiController {
|
|||||||
if (quotaExceeded(httpRequest, responseWriter)) {
|
if (quotaExceeded(httpRequest, responseWriter)) {
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
// try {
|
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
|
||||||
// JsonObject request = new JsonParser().parse(json).getAsJsonObject();
|
TransportService transportService = transportContext.getTransportService();
|
||||||
// process(ctx, new ToServerRpcRequestMsg(0,
|
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
|
||||||
// request.get("method").getAsString(),
|
transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
|
||||||
// request.get("params").toString()));
|
.setMethodName(request.get("method").getAsString())
|
||||||
// } catch (IllegalStateException | JsonSyntaxException ex) {
|
.setParams(request.get("params").toString()).build(),
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||||
// }
|
}));
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
|
||||||
// }
|
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,49 +187,30 @@ public class DeviceApiController {
|
|||||||
public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
|
public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
|
||||||
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
|
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
|
||||||
HttpServletRequest httpRequest) {
|
HttpServletRequest httpRequest) {
|
||||||
return null;
|
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
|
||||||
// return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
|
if (quotaExceeded(httpRequest, responseWriter)) {
|
||||||
|
return responseWriter;
|
||||||
|
}
|
||||||
|
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||||
|
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||||
|
TransportService transportService = transportContext.getTransportService();
|
||||||
|
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
|
||||||
|
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
|
||||||
|
transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
|
||||||
|
new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||||
|
|
||||||
|
}));
|
||||||
|
return responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
// private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
|
|
||||||
// DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
|
|
||||||
// if (quotaExceeded(httpRequest, responseWriter)) {
|
|
||||||
// return responseWriter;
|
|
||||||
// }
|
|
||||||
// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
|
|
||||||
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
|
||||||
// try {
|
|
||||||
// process(ctx, msg);
|
|
||||||
// } catch (IllegalStateException | JsonSyntaxException ex) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
|
||||||
// }
|
|
||||||
// return responseWriter;
|
|
||||||
// }
|
|
||||||
|
|
||||||
private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) {
|
|
||||||
return getHttpSessionCtx(responseWriter, transportContext.getDefaultTimeout());
|
|
||||||
}
|
|
||||||
|
|
||||||
private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) {
|
|
||||||
return null;
|
|
||||||
// return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
// private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
|
|
||||||
// AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
|
|
||||||
//// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
|
|
||||||
// }
|
|
||||||
|
|
||||||
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
|
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
|
||||||
if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) {
|
if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) {
|
||||||
log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
|
log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
|
||||||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
|
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
|
||||||
return true;
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
|
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
|
||||||
@ -274,7 +262,6 @@ public class DeviceApiController {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Void msg) {
|
public void onSuccess(Void msg) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -283,13 +270,30 @@ public class DeviceApiController {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class HttpSessionListener implements SessionMsgListener {
|
private static class HttpOkCallback implements TransportServiceCallback<Void> {
|
||||||
|
|
||||||
private final TransportContext transportContext;
|
|
||||||
private final DeferredResult<ResponseEntity> responseWriter;
|
private final DeferredResult<ResponseEntity> responseWriter;
|
||||||
|
|
||||||
HttpSessionListener(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter) {
|
public HttpOkCallback(DeferredResult<ResponseEntity> responseWriter) {
|
||||||
this.transportContext = transportContext;
|
this.responseWriter = responseWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Void msg) {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable e) {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class HttpSessionListener implements SessionMsgListener {
|
||||||
|
|
||||||
|
private final DeferredResult<ResponseEntity> responseWriter;
|
||||||
|
|
||||||
|
HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) {
|
||||||
this.responseWriter = responseWriter;
|
this.responseWriter = responseWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -305,16 +309,17 @@ public class DeviceApiController {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) {
|
public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
|
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) {
|
public void onToServerRpcResponse(ToServerRpcResponseMsg msg) {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,12 +26,12 @@ import org.thingsboard.server.common.transport.TransportContext;
|
|||||||
* Created by ashvayka on 04.10.18.
|
* Created by ashvayka on 04.10.18.
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
|
@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true", matchIfMissing = true)
|
||||||
@Component
|
@Component
|
||||||
public class HttpTransportContext extends TransportContext {
|
public class HttpTransportContext extends TransportContext {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@Value("${http.request_timeout}")
|
@Value("${transport.http.request_timeout}")
|
||||||
private long defaultTimeout;
|
private long defaultTimeout;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,161 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.thingsboard.server.transport.http.session;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
|
|
||||||
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Andrew Shvayka
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
public class HttpSessionCtx extends DeviceAwareSessionContext {
|
|
||||||
|
|
||||||
public HttpSessionCtx(UUID sessionId) {
|
|
||||||
super(sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int nextMsgId() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// private final SessionId sessionId;
|
|
||||||
// private final long timeout;
|
|
||||||
// private final DeferredResult<ResponseEntity> responseWriter;
|
|
||||||
//
|
|
||||||
// public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
|
|
||||||
// super();
|
|
||||||
// this.sessionId = new HttpSessionId();
|
|
||||||
// this.responseWriter = responseWriter;
|
|
||||||
// this.timeout = timeout;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public SessionType getSessionType() {
|
|
||||||
// return SessionType.SYNC;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public void onMsg(SessionActorToAdaptorMsg source) throws SessionException {
|
|
||||||
// ToDeviceMsg msg = source.getMsg();
|
|
||||||
// switch (msg.getSessionMsgType()) {
|
|
||||||
// case GET_ATTRIBUTES_RESPONSE:
|
|
||||||
// reply((GetAttributesResponse) msg);
|
|
||||||
// return;
|
|
||||||
// case STATUS_CODE_RESPONSE:
|
|
||||||
// reply((StatusCodeResponse) msg);
|
|
||||||
// return;
|
|
||||||
// case ATTRIBUTES_UPDATE_NOTIFICATION:
|
|
||||||
// reply((AttributesUpdateNotification) msg);
|
|
||||||
// return;
|
|
||||||
// case TO_DEVICE_RPC_REQUEST:
|
|
||||||
// reply((ToDeviceRpcRequestMsg) msg);
|
|
||||||
// return;
|
|
||||||
// case TO_SERVER_RPC_RESPONSE:
|
|
||||||
// reply((ToServerRpcResponseMsg) msg);
|
|
||||||
// return;
|
|
||||||
// case RULE_ENGINE_ERROR:
|
|
||||||
// reply((RuleEngineErrorMsg) msg);
|
|
||||||
// return;
|
|
||||||
// default:
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(RuleEngineErrorMsg msg) {
|
|
||||||
// HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
|
|
||||||
// switch (msg.getError()) {
|
|
||||||
// case QUEUE_PUT_TIMEOUT:
|
|
||||||
// status = HttpStatus.REQUEST_TIMEOUT;
|
|
||||||
// break;
|
|
||||||
// default:
|
|
||||||
// if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) {
|
|
||||||
// status = HttpStatus.BAD_REQUEST;
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private <T> void reply(ResponseMsg<? extends T> msg, Consumer<T> f) {
|
|
||||||
// Optional<Exception> msgError = msg.getError();
|
|
||||||
// if (!msgError.isPresent()) {
|
|
||||||
// Optional<? extends T> msgData = msg.getData();
|
|
||||||
// if (msgData.isPresent()) {
|
|
||||||
// f.accept(msgData.get());
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// Exception e = msgError.get();
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(ToDeviceRpcRequestMsg msg) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(ToServerRpcResponseMsg msg) {
|
|
||||||
//// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(AttributesUpdateNotification msg) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(GetAttributesResponse msg) {
|
|
||||||
// reply(msg, payload -> {
|
|
||||||
// if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
|
|
||||||
// } else {
|
|
||||||
// JsonObject result = JsonConverter.toJson(payload, false);
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK));
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void reply(StatusCodeResponse msg) {
|
|
||||||
// reply(msg, payload -> {
|
|
||||||
// if (payload == 0) {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
|
|
||||||
// } else {
|
|
||||||
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload)));
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public void onMsg(SessionCtrlMsg msg) throws SessionException {
|
|
||||||
// //Do nothing
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public boolean isClosed() {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public long getTimeout() {
|
|
||||||
// return timeout;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// public SessionId getSessionId() {
|
|
||||||
// return sessionId;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import io.netty.handler.ssl.SslHandler;
|
import io.netty.handler.ssl.SslHandler;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@ -56,6 +57,7 @@ public class MqttTransportContext extends TransportContext {
|
|||||||
private Integer maxPayloadSize;
|
private Integer maxPayloadSize;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
|
@Setter
|
||||||
private SslHandler sslHandler;
|
private SslHandler sslHandler;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
|
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
|
||||||
.build();
|
.build();
|
||||||
transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
|
transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
|
||||||
transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this);
|
transportService.registerAsyncSession(sessionInfo, this);
|
||||||
checkGatewaySession();
|
checkGatewaySession();
|
||||||
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
|
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -121,7 +121,7 @@ public class GatewaySessionHandler {
|
|||||||
transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
||||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
|
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
|
||||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
|
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
|
||||||
transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx);
|
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||||
}
|
}
|
||||||
future.set(devices.get(deviceName));
|
future.set(devices.get(deviceName));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,3 +1,18 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
package org.thingsboard.server.common.transport;
|
package org.thingsboard.server.common.transport;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -59,7 +59,9 @@ public interface TransportService {
|
|||||||
|
|
||||||
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||||
|
|
||||||
void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener);
|
void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
|
||||||
|
|
||||||
|
void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
|
||||||
|
|
||||||
void deregisterSession(SessionInfoProto sessionInfo);
|
void deregisterSession(SessionInfoProto sessionInfo);
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,101 @@
|
|||||||
|
package org.thingsboard.server.common.transport.service;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||||
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ashvayka on 17.10.18.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public abstract class AbstractTransportService implements TransportService {
|
||||||
|
|
||||||
|
protected ScheduledExecutorService schedulerExecutor;
|
||||||
|
protected ExecutorService transportCallbackExecutor;
|
||||||
|
protected ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
|
||||||
|
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
|
||||||
|
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
|
||||||
|
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener));
|
||||||
|
schedulerExecutor.schedule(() -> {
|
||||||
|
listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
|
||||||
|
deregisterSession(sessionInfo);
|
||||||
|
}, timeout, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
|
||||||
|
sessions.remove(toId(sessionInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
|
||||||
|
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
|
||||||
|
SessionMetaData md = sessions.get(sessionId);
|
||||||
|
if (md != null) {
|
||||||
|
SessionMsgListener listener = md.getListener();
|
||||||
|
transportCallbackExecutor.submit(() -> {
|
||||||
|
if (toSessionMsg.hasGetAttributesResponse()) {
|
||||||
|
listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
|
||||||
|
}
|
||||||
|
if (toSessionMsg.hasAttributeUpdateNotification()) {
|
||||||
|
listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
|
||||||
|
}
|
||||||
|
if (toSessionMsg.hasSessionCloseNotification()) {
|
||||||
|
listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
|
||||||
|
}
|
||||||
|
if (toSessionMsg.hasToDeviceRequest()) {
|
||||||
|
listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
|
||||||
|
}
|
||||||
|
if (toSessionMsg.hasToServerResponse()) {
|
||||||
|
listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
|
||||||
|
deregisterSession(md.getSessionInfo());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//TODO: should we notify the device actor about missed session?
|
||||||
|
log.debug("[{}] Missing session.", sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
|
||||||
|
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
|
||||||
|
}
|
||||||
|
|
||||||
|
String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
|
||||||
|
return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init() {
|
||||||
|
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() {
|
||||||
|
if (schedulerExecutor != null) {
|
||||||
|
schedulerExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
if (transportCallbackExecutor != null) {
|
||||||
|
transportCallbackExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
* <p>
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||||
import org.thingsboard.server.common.transport.TransportService;
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.*;
|
import org.thingsboard.server.gen.transport.TransportProtos.*;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
|
||||||
@ -62,7 +63,7 @@ import java.util.concurrent.Executors;
|
|||||||
@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote", matchIfMissing = true)
|
@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote", matchIfMissing = true)
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RemoteTransportService implements TransportService {
|
public class RemoteTransportService extends AbstractTransportService {
|
||||||
|
|
||||||
@Value("${kafka.rule_engine.topic}")
|
@Value("${kafka.rule_engine.topic}")
|
||||||
private String ruleEngineTopic;
|
private String ruleEngineTopic;
|
||||||
@ -85,16 +86,12 @@ public class RemoteTransportService implements TransportService {
|
|||||||
@Value("${kafka.transport_api.response_auto_commit_interval}")
|
@Value("${kafka.transport_api.response_auto_commit_interval}")
|
||||||
private int autoCommitInterval;
|
private int autoCommitInterval;
|
||||||
|
|
||||||
private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TbKafkaSettings kafkaSettings;
|
private TbKafkaSettings kafkaSettings;
|
||||||
//We use this to get the node id. We should replace this with a component that provides the node id.
|
//We use this to get the node id. We should replace this with a component that provides the node id.
|
||||||
@Autowired
|
@Autowired
|
||||||
private TbNodeIdProvider nodeIdProvider;
|
private TbNodeIdProvider nodeIdProvider;
|
||||||
|
|
||||||
private ExecutorService transportCallbackExecutor;
|
|
||||||
|
|
||||||
private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
|
private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
|
||||||
private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
|
private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
|
||||||
private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
|
private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
|
||||||
@ -105,7 +102,7 @@ public class RemoteTransportService implements TransportService {
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
this.transportCallbackExecutor = Executors.newCachedThreadPool();
|
super.init();
|
||||||
|
|
||||||
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
|
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
|
||||||
requestBuilder.settings(kafkaSettings);
|
requestBuilder.settings(kafkaSettings);
|
||||||
@ -157,36 +154,7 @@ public class RemoteTransportService implements TransportService {
|
|||||||
try {
|
try {
|
||||||
ToTransportMsg toTransportMsg = mainConsumer.decode(record);
|
ToTransportMsg toTransportMsg = mainConsumer.decode(record);
|
||||||
if (toTransportMsg.hasToDeviceSessionMsg()) {
|
if (toTransportMsg.hasToDeviceSessionMsg()) {
|
||||||
DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
|
processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
|
||||||
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
|
|
||||||
SessionMetaData md = sessions.get(sessionId);
|
|
||||||
if (md != null) {
|
|
||||||
SessionMsgListener listener = md.getListener();
|
|
||||||
transportCallbackExecutor.submit(() -> {
|
|
||||||
if (toSessionMsg.hasGetAttributesResponse()) {
|
|
||||||
listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
|
|
||||||
}
|
|
||||||
if (toSessionMsg.hasAttributeUpdateNotification()) {
|
|
||||||
listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
|
|
||||||
}
|
|
||||||
if (toSessionMsg.hasSessionCloseNotification()) {
|
|
||||||
listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
|
|
||||||
}
|
|
||||||
if (toSessionMsg.hasToDeviceRequest()) {
|
|
||||||
listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
|
|
||||||
}
|
|
||||||
if (toSessionMsg.hasToServerResponse()) {
|
|
||||||
listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (md.getSessionType() == SessionType.SYNC) {
|
|
||||||
deregisterSession(md.getSessionInfo());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
//TODO: should we notify the device actor about missed session?
|
|
||||||
log.debug("[{}] Missing session.", sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("Failed to process the notification.", e);
|
log.warn("Failed to process the notification.", e);
|
||||||
@ -206,13 +174,11 @@ public class RemoteTransportService implements TransportService {
|
|||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
|
super.destroy();
|
||||||
stopped = true;
|
stopped = true;
|
||||||
if (transportApiTemplate != null) {
|
if (transportApiTemplate != null) {
|
||||||
transportApiTemplate.stop();
|
transportApiTemplate.stop();
|
||||||
}
|
}
|
||||||
if (transportCallbackExecutor != null) {
|
|
||||||
transportCallbackExecutor.shutdownNow();
|
|
||||||
}
|
|
||||||
if (mainConsumer != null) {
|
if (mainConsumer != null) {
|
||||||
mainConsumer.unsubscribe();
|
mainConsumer.unsubscribe();
|
||||||
}
|
}
|
||||||
@ -314,25 +280,6 @@ public class RemoteTransportService implements TransportService {
|
|||||||
send(sessionInfo, toRuleEngineMsg, callback);
|
send(sessionInfo, toRuleEngineMsg, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) {
|
|
||||||
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener));
|
|
||||||
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deregisterSession(SessionInfoProto sessionInfo) {
|
|
||||||
sessions.remove(toId(sessionInfo));
|
|
||||||
}
|
|
||||||
|
|
||||||
private UUID toId(SessionInfoProto sessionInfo) {
|
|
||||||
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getRoutingKey(SessionInfoProto sessionInfo) {
|
|
||||||
return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class TransportCallbackAdaptor implements Callback {
|
private static class TransportCallbackAdaptor implements Callback {
|
||||||
private final TransportServiceCallback<Void> callback;
|
private final TransportServiceCallback<Void> callback;
|
||||||
|
|
||||||
|
|||||||
@ -1,3 +1,18 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
package org.thingsboard.server.common.transport.service;
|
package org.thingsboard.server.common.transport.service;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user