diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index a7590c9610..b8001725d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.service.AbstractTransportService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -56,6 +57,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -67,11 +70,7 @@ import java.util.function.Consumer; @Slf4j @Service @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local") -public class LocalTransportService implements TransportService, RuleEngineTransportService { - - private ConcurrentMap sessions = new ConcurrentHashMap<>(); - - private ExecutorService transportCallbackExecutor; +public class LocalTransportService extends AbstractTransportService implements RuleEngineTransportService { @Autowired private TransportApiService transportApiService; @@ -89,14 +88,12 @@ public class LocalTransportService implements TransportService, RuleEngineTransp @PostConstruct public void init() { - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + super.init(); } @PreDestroy public void destroy() { - if (transportCallbackExecutor != null) { - transportCallbackExecutor.shutdownNow(); - } + super.destroy(); } @Override @@ -175,21 +172,6 @@ public class LocalTransportService implements TransportService, RuleEngineTransp forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback); } - @Override - public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) { - sessions.putIfAbsent(toId(sessionInfo), listener); - //TODO: monitor sessions periodically: PING REQ/RESP, etc. - } - - @Override - public void deregisterSession(SessionInfoProto sessionInfo) { - sessions.remove(toId(sessionInfo)); - } - - private UUID toId(SessionInfoProto sessionInfo) { - return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); - } - @Override public void process(String nodeId, DeviceActorToTransportMsg msg) { process(nodeId, msg, null, null); @@ -197,30 +179,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp @Override public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure) { - UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); - SessionMsgListener listener = sessions.get(sessionId); - if (listener != null) { - transportCallbackExecutor.submit(() -> { - if (msg.hasGetAttributesResponse()) { - listener.onGetAttributesResponse(msg.getGetAttributesResponse()); - } - if (msg.hasAttributeUpdateNotification()) { - listener.onAttributeUpdate(msg.getAttributeUpdateNotification()); - } - if (msg.hasSessionCloseNotification()) { - listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification()); - } - if (msg.hasToDeviceRequest()) { - listener.onToDeviceRpcRequest(msg.getToDeviceRequest()); - } - if (msg.hasToServerResponse()) { - listener.onToServerRpcResponse(msg.getToServerResponse()); - } - }); - } else { - //TODO: should we notify the device actor about missed session? - log.debug("[{}] Missing session.", sessionId); - } + processToTransportMsg(msg); if (onSuccess != null) { onSuccess.run(); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ca509443bf..0eb624b181 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -439,7 +439,7 @@ transport: # Local MQTT transport parameters mqtt: # Enable/disable mqtt transport protocol. - enabled: "${MQTT_ENABLED:true}" + enabled: "${MQTT_ENABLED:false}" bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}" bind_port: "${MQTT_BIND_PORT:1883}" adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index bca05c0459..8d579d4eb2 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.transport.http; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -33,9 +35,20 @@ import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.*; -import org.thingsboard.server.transport.http.session.HttpSessionCtx; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import javax.servlet.http.HttpServletRequest; import java.util.Arrays; @@ -76,7 +89,7 @@ public class DeviceApiController { request.addAllSharedAttributeNames(sharedKeySet); } TransportService transportService = transportContext.getTransportService(); - transportService.registerSession(sessionInfo, TransportProtos.SessionType.SYNC, new HttpSessionListener(transportContext, responseWriter)); + transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); })); return responseWriter; @@ -85,20 +98,16 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST) public DeferredResult postDeviceAttributes(@PathVariable("deviceToken") String deviceToken, @RequestBody String json, HttpServletRequest request) { - DeferredResult responseWriter = new DeferredResult(); + DeferredResult responseWriter = new DeferredResult<>(); if (quotaExceeded(request, responseWriter)) { return responseWriter; } -// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// try { -// process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json))); -// } catch (IllegalStateException | JsonSyntaxException ex) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); -// } -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)), + new HttpOkCallback(responseWriter)); + })); return responseWriter; } @@ -109,26 +118,33 @@ public class DeviceApiController { if (quotaExceeded(request, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// try { -// process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json))); -// } catch (IllegalStateException | JsonSyntaxException ex) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); -// } -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)), + new HttpOkCallback(responseWriter)); + })); return responseWriter; } @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToCommands(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, - HttpServletRequest request) { + HttpServletRequest httpRequest) { + DeferredResult responseWriter = new DeferredResult<>(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), + timeout == 0 ? transportContext.getDefaultTimeout() : timeout); + transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(), + new SessionCloseOnErrorCallback(transportService, sessionInfo)); -// return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); - return null; + })); + return responseWriter; } @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) @@ -139,17 +155,11 @@ public class DeviceApiController { if (quotaExceeded(request, responseWriter)) { return responseWriter; } -// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// try { -// JsonObject response = new JsonParser().parse(json).getAsJsonObject(); -// process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString())); -// } catch (IllegalStateException | JsonSyntaxException ex) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); -// } -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + transportService.process(sessionInfo, ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(json).build(), new HttpOkCallback(responseWriter)); + })); return responseWriter; } @@ -160,19 +170,16 @@ public class DeviceApiController { if (quotaExceeded(httpRequest, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// try { -// JsonObject request = new JsonParser().parse(json).getAsJsonObject(); -// process(ctx, new ToServerRpcRequestMsg(0, -// request.get("method").getAsString(), -// request.get("params").toString())); -// } catch (IllegalStateException | JsonSyntaxException ex) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); -// } -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + JsonObject request = new JsonParser().parse(json).getAsJsonObject(); + TransportService transportService = transportContext.getTransportService(); + transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); + transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0) + .setMethodName(request.get("method").getAsString()) + .setParams(request.get("params").toString()).build(), + new SessionCloseOnErrorCallback(transportService, sessionInfo)); + })); return responseWriter; } @@ -180,49 +187,30 @@ public class DeviceApiController { public DeferredResult subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, HttpServletRequest httpRequest) { - return null; -// return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); + DeferredResult responseWriter = new DeferredResult<>(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), + timeout == 0 ? transportContext.getDefaultTimeout() : timeout); + transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(), + new SessionCloseOnErrorCallback(transportService, sessionInfo)); + + })); + return responseWriter; } -// private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { -// DeferredResult responseWriter = new DeferredResult(); -// if (quotaExceeded(httpRequest, responseWriter)) { -// return responseWriter; -// } -// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// try { -// process(ctx, msg); -// } catch (IllegalStateException | JsonSyntaxException ex) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); -// } -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } -// return responseWriter; -// } - - private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter) { - return getHttpSessionCtx(responseWriter, transportContext.getDefaultTimeout()); - } - - private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter, long timeout) { - return null; -// return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout); - } - -// private void process(HttpSessionCtx ctx, FromDeviceMsg request) { -// AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request); -//// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); -// } - private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) { log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr()); responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)); return true; + } else { + return false; } - return false; } private static class DeviceAuthCallback implements TransportServiceCallback { @@ -274,7 +262,6 @@ public class DeviceApiController { @Override public void onSuccess(Void msg) { - } @Override @@ -283,13 +270,30 @@ public class DeviceApiController { } } - private static class HttpSessionListener implements SessionMsgListener { - - private final TransportContext transportContext; + private static class HttpOkCallback implements TransportServiceCallback { private final DeferredResult responseWriter; - HttpSessionListener(TransportContext transportContext, DeferredResult responseWriter) { - this.transportContext = transportContext; + public HttpOkCallback(DeferredResult responseWriter) { + this.responseWriter = responseWriter; + } + + @Override + public void onSuccess(Void msg) { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + + @Override + public void onError(Throwable e) { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + } + + + private static class HttpSessionListener implements SessionMsgListener { + + private final DeferredResult responseWriter; + + HttpSessionListener(DeferredResult responseWriter) { this.responseWriter = responseWriter; } @@ -305,16 +309,17 @@ public class DeviceApiController { @Override public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); } @Override - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { - + public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { + responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); } @Override - public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) { - + public void onToServerRpcResponse(ToServerRpcResponseMsg msg) { + responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } } } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java index 87db06e66b..66654c1bed 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java @@ -26,12 +26,12 @@ import org.thingsboard.server.common.transport.TransportContext; * Created by ashvayka on 04.10.18. */ @Slf4j -@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true) +@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true", matchIfMissing = true) @Component public class HttpTransportContext extends TransportContext { @Getter - @Value("${http.request_timeout}") + @Value("${transport.http.request_timeout}") private long defaultTimeout; } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java deleted file mode 100644 index 27cd4f1dd4..0000000000 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.http.session; - -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; - -import java.util.UUID; - -/** - * @author Andrew Shvayka - */ -@Slf4j -public class HttpSessionCtx extends DeviceAwareSessionContext { - - public HttpSessionCtx(UUID sessionId) { - super(sessionId); - } - - @Override - public int nextMsgId() { - return 0; - } - - // private final SessionId sessionId; -// private final long timeout; -// private final DeferredResult responseWriter; -// -// public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult responseWriter, long timeout) { -// super(); -// this.sessionId = new HttpSessionId(); -// this.responseWriter = responseWriter; -// this.timeout = timeout; -// } -// -// @Override -// public SessionType getSessionType() { -// return SessionType.SYNC; -// } -// -// @Override -// public void onMsg(SessionActorToAdaptorMsg source) throws SessionException { -// ToDeviceMsg msg = source.getMsg(); -// switch (msg.getSessionMsgType()) { -// case GET_ATTRIBUTES_RESPONSE: -// reply((GetAttributesResponse) msg); -// return; -// case STATUS_CODE_RESPONSE: -// reply((StatusCodeResponse) msg); -// return; -// case ATTRIBUTES_UPDATE_NOTIFICATION: -// reply((AttributesUpdateNotification) msg); -// return; -// case TO_DEVICE_RPC_REQUEST: -// reply((ToDeviceRpcRequestMsg) msg); -// return; -// case TO_SERVER_RPC_RESPONSE: -// reply((ToServerRpcResponseMsg) msg); -// return; -// case RULE_ENGINE_ERROR: -// reply((RuleEngineErrorMsg) msg); -// return; -// default: -// break; -// } -// } -// -// private void reply(RuleEngineErrorMsg msg) { -// HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR; -// switch (msg.getError()) { -// case QUEUE_PUT_TIMEOUT: -// status = HttpStatus.REQUEST_TIMEOUT; -// break; -// default: -// if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) { -// status = HttpStatus.BAD_REQUEST; -// } -// break; -// } -// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status)); -// } -// -// private void reply(ResponseMsg msg, Consumer f) { -// Optional msgError = msg.getError(); -// if (!msgError.isPresent()) { -// Optional msgData = msg.getData(); -// if (msgData.isPresent()) { -// f.accept(msgData.get()); -// } -// } else { -// Exception e = msgError.get(); -// responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR)); -// } -// } -// -// private void reply(ToDeviceRpcRequestMsg msg) { -// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); -// } -// -// private void reply(ToServerRpcResponseMsg msg) { -//// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); -// } -// -// private void reply(AttributesUpdateNotification msg) { -// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK)); -// } -// -// private void reply(GetAttributesResponse msg) { -// reply(msg, payload -> { -// if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); -// } else { -// JsonObject result = JsonConverter.toJson(payload, false); -// responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK)); -// } -// }); -// } -// -// private void reply(StatusCodeResponse msg) { -// reply(msg, payload -> { -// if (payload == 0) { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload))); -// } -// }); -// } -// -// @Override -// public void onMsg(SessionCtrlMsg msg) throws SessionException { -// //Do nothing -// } -// -// @Override -// public boolean isClosed() { -// return false; -// } -// -// @Override -// public long getTimeout() { -// return timeout; -// } -// -// @Override -// public SessionId getSessionId() { -// return sessionId; -// } -} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 16f8981d57..91580bff7d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.handler.ssl.SslHandler; import lombok.Data; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -56,6 +57,7 @@ public class MqttTransportContext extends TransportContext { private Integer maxPayloadSize; @Getter + @Setter private SslHandler sslHandler; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 30cc9c75a2..d739824aa5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) .build(); transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null); - transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this); + transportService.registerAsyncSession(sessionInfo, this); checkGatewaySession(); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 43f997830f..d6000591a8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -121,7 +121,7 @@ public class GatewaySessionHandler { transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); - transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx); + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); } future.set(devices.get(deviceName)); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java index f846cf0c38..ab429829e4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.transport; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 9bbc40c1b6..5d0c14d78c 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -59,7 +59,9 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); - void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener); + void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); + + void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); void deregisterSession(SessionInfoProto sessionInfo); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java new file mode 100644 index 0000000000..1bf4dd1cbf --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -0,0 +1,101 @@ +package org.thingsboard.server.common.transport.service; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Created by ashvayka on 17.10.18. + */ +@Slf4j +public abstract class AbstractTransportService implements TransportService { + + protected ScheduledExecutorService schedulerExecutor; + protected ExecutorService transportCallbackExecutor; + protected ConcurrentMap sessions = new ConcurrentHashMap<>(); + + @Override + public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { + sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); + //TODO: monitor sessions periodically: PING REQ/RESP, etc. + } + + @Override + public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { + sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener)); + schedulerExecutor.schedule(() -> { + listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); + deregisterSession(sessionInfo); + }, timeout, TimeUnit.MILLISECONDS); + + } + + @Override + public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) { + sessions.remove(toId(sessionInfo)); + } + + protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) { + UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); + SessionMetaData md = sessions.get(sessionId); + if (md != null) { + SessionMsgListener listener = md.getListener(); + transportCallbackExecutor.submit(() -> { + if (toSessionMsg.hasGetAttributesResponse()) { + listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); + } + if (toSessionMsg.hasAttributeUpdateNotification()) { + listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); + } + if (toSessionMsg.hasSessionCloseNotification()) { + listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification()); + } + if (toSessionMsg.hasToDeviceRequest()) { + listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); + } + if (toSessionMsg.hasToServerResponse()) { + listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); + } + }); + if (md.getSessionType() == TransportProtos.SessionType.SYNC) { + deregisterSession(md.getSessionInfo()); + } + } else { + //TODO: should we notify the device actor about missed session? + log.debug("[{}] Missing session.", sessionId); + } + } + + protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + } + + String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); + } + + public void init() { + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + } + + public void destroy() { + if (schedulerExecutor != null) { + schedulerExecutor.shutdownNow(); + } + if (transportCallbackExecutor != null) { + transportCallbackExecutor.shutdownNow(); + } + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index eb6dbc5cb2..1442cc5f21 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,6 +26,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; @@ -62,7 +63,7 @@ import java.util.concurrent.Executors; @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote", matchIfMissing = true) @Service @Slf4j -public class RemoteTransportService implements TransportService { +public class RemoteTransportService extends AbstractTransportService { @Value("${kafka.rule_engine.topic}") private String ruleEngineTopic; @@ -85,16 +86,12 @@ public class RemoteTransportService implements TransportService { @Value("${kafka.transport_api.response_auto_commit_interval}") private int autoCommitInterval; - private ConcurrentMap sessions = new ConcurrentHashMap<>(); - @Autowired private TbKafkaSettings kafkaSettings; //We use this to get the node id. We should replace this with a component that provides the node id. @Autowired private TbNodeIdProvider nodeIdProvider; - private ExecutorService transportCallbackExecutor; - private TbKafkaRequestTemplate transportApiTemplate; private TBKafkaProducerTemplate ruleEngineProducer; private TBKafkaConsumerTemplate mainConsumer; @@ -105,7 +102,7 @@ public class RemoteTransportService implements TransportService { @PostConstruct public void init() { - this.transportCallbackExecutor = Executors.newCachedThreadPool(); + super.init(); TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); @@ -157,36 +154,7 @@ public class RemoteTransportService implements TransportService { try { ToTransportMsg toTransportMsg = mainConsumer.decode(record); if (toTransportMsg.hasToDeviceSessionMsg()) { - DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg(); - UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); - SessionMetaData md = sessions.get(sessionId); - if (md != null) { - SessionMsgListener listener = md.getListener(); - transportCallbackExecutor.submit(() -> { - if (toSessionMsg.hasGetAttributesResponse()) { - listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); - } - if (toSessionMsg.hasAttributeUpdateNotification()) { - listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); - } - if (toSessionMsg.hasSessionCloseNotification()) { - listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification()); - } - if (toSessionMsg.hasToDeviceRequest()) { - listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); - } - if (toSessionMsg.hasToServerResponse()) { - listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); - } - }); - if (md.getSessionType() == SessionType.SYNC) { - deregisterSession(md.getSessionInfo()); - } - } else { - //TODO: should we notify the device actor about missed session? - log.debug("[{}] Missing session.", sessionId); - } - + processToTransportMsg(toTransportMsg.getToDeviceSessionMsg()); } } catch (Throwable e) { log.warn("Failed to process the notification.", e); @@ -206,13 +174,11 @@ public class RemoteTransportService implements TransportService { @PreDestroy public void destroy() { + super.destroy(); stopped = true; if (transportApiTemplate != null) { transportApiTemplate.stop(); } - if (transportCallbackExecutor != null) { - transportCallbackExecutor.shutdownNow(); - } if (mainConsumer != null) { mainConsumer.unsubscribe(); } @@ -314,25 +280,6 @@ public class RemoteTransportService implements TransportService { send(sessionInfo, toRuleEngineMsg, callback); } - @Override - public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) { - sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener)); - //TODO: monitor sessions periodically: PING REQ/RESP, etc. - } - - @Override - public void deregisterSession(SessionInfoProto sessionInfo) { - sessions.remove(toId(sessionInfo)); - } - - private UUID toId(SessionInfoProto sessionInfo) { - return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); - } - - private String getRoutingKey(SessionInfoProto sessionInfo) { - return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); - } - private static class TransportCallbackAdaptor implements Callback { private final TransportServiceCallback callback; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java index dd52f77ca6..1de57114e4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.transport.service; import lombok.Data;