From 68eabb9ec20c9e0abae1398e5998b6ccd7e88a40 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 16 Oct 2018 13:20:40 +0300 Subject: [PATCH] TMP commit for HTTP transport --- application/pom.xml | 15 +- .../transport/LocalTransportService.java | 3 +- {transport => common/transport}/http/pom.xml | 10 +- .../transport/http/DeviceApiController.java | 174 ++++++++++++++---- .../transport/http/HttpTransportContext.java | 37 ++++ .../http/session/HttpSessionCtx.java | 0 .../transport/mqtt/MqttTransportContext.java | 38 +--- .../transport/mqtt/MqttTransportHandler.java | 2 +- .../mqtt/session/GatewaySessionHandler.java | 2 +- common/transport/pom.xml | 2 +- .../common/transport/TransportContext.java | 53 ++++++ .../common/transport/TransportService.java | 2 +- .../service/RemoteTransportService.java | 22 ++- .../transport/service/SessionMetaData.java | 17 ++ pom.xml | 2 +- transport/pom.xml | 1 - 16 files changed, 281 insertions(+), 99 deletions(-) rename {transport => common/transport}/http/pom.xml (90%) rename {transport => common/transport}/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java (60%) create mode 100644 common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java rename {transport => common/transport}/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java (100%) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java diff --git a/application/pom.xml b/application/pom.xml index 915a4ab5e1..f8c3219559 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -61,17 +61,17 @@ transport - - - - - - + + org.thingsboard.common.transport mqtt + + org.thingsboard.common.transport + http + org.thingsboard dao @@ -542,7 +542,8 @@ -PprojectBuildDir=${project.build.directory} -PprojectVersion=${project.version} - -PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging} + -PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging} + -PpkgName=${pkg.name} -PpkgInstallFolder=${pkg.installFolder} -PpkgLogFolder=${pkg.unixLogFolder} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index ba5cc074d4..a7590c9610 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; @@ -175,7 +176,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp } @Override - public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { + public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) { sessions.putIfAbsent(toId(sessionInfo), listener); //TODO: monitor sessions periodically: PING REQ/RESP, etc. } diff --git a/transport/http/pom.xml b/common/transport/http/pom.xml similarity index 90% rename from transport/http/pom.xml rename to common/transport/http/pom.xml index 996470832c..2ee11426a9 100644 --- a/transport/http/pom.xml +++ b/common/transport/http/pom.xml @@ -19,15 +19,15 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.thingsboard + org.thingsboard.common 2.2.0-SNAPSHOT transport - org.thingsboard.transport + org.thingsboard.common.transport http jar - Thingsboard HTTP Transport + Thingsboard HTTP Transport Common https://thingsboard.io @@ -37,8 +37,8 @@ - org.thingsboard.common - transport + org.thingsboard.common.transport + transport-api org.springframework.boot diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java similarity index 60% rename from transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java rename to common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index cd6d89998c..bca05c0459 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,64 +17,68 @@ package org.thingsboard.server.transport.http; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; +import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportContext; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.transport.http.session.HttpSessionCtx; import javax.servlet.http.HttpServletRequest; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.function.Consumer; /** * @author Andrew Shvayka */ @RestController -@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true", matchIfMissing = true) @RequestMapping("/api/v1") @Slf4j public class DeviceApiController { - @Value("${http.request_timeout}") - private long defaultTimeout; - - @Autowired(required = false) - private SessionMsgProcessor processor; - - @Autowired(required = false) - private DeviceAuthService authService; - - @Autowired(required = false) - private HostRequestsQuotaService quotaService; + @Autowired + private HttpTransportContext transportContext; @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") public DeferredResult getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys, @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys, HttpServletRequest httpRequest) { - DeferredResult responseWriter = new DeferredResult(); + DeferredResult responseWriter = new DeferredResult<>(); if (quotaExceeded(httpRequest, responseWriter)) { return responseWriter; } -// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); -// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { -// GetAttributesRequest request; -// if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) { -// request = new BasicGetAttributesRequest(0); -// } else { -// Set clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null; -// Set sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null; -// request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet); -// } -// process(ctx, request); -// } else { -// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); -// } - + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + GetAttributeRequestMsg.Builder request = GetAttributeRequestMsg.newBuilder().setRequestId(0); + List clientKeySet = !StringUtils.isEmpty(clientKeys) ? Arrays.asList(clientKeys.split(",")) : null; + List sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? Arrays.asList(sharedKeys.split(",")) : null; + if (clientKeySet != null) { + request.addAllClientAttributeNames(clientKeySet); + } + if (sharedKeySet != null) { + request.addAllSharedAttributeNames(sharedKeySet); + } + TransportService transportService = transportContext.getTransportService(); + transportService.registerSession(sessionInfo, TransportProtos.SessionType.SYNC, new HttpSessionListener(transportContext, responseWriter)); + transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); + })); return responseWriter; } @@ -199,7 +203,7 @@ public class DeviceApiController { // } private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter) { - return getHttpSessionCtx(responseWriter, defaultTimeout); + return getHttpSessionCtx(responseWriter, transportContext.getDefaultTimeout()); } private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter, long timeout) { @@ -213,7 +217,7 @@ public class DeviceApiController { // } private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { - if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { + if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) { log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr()); responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)); return true; @@ -221,4 +225,96 @@ public class DeviceApiController { return false; } + private static class DeviceAuthCallback implements TransportServiceCallback { + private final TransportContext transportContext; + private final DeferredResult responseWriter; + private final Consumer onSuccess; + + DeviceAuthCallback(TransportContext transportContext, DeferredResult responseWriter, Consumer onSuccess) { + this.transportContext = transportContext; + this.responseWriter = responseWriter; + this.onSuccess = onSuccess; + } + + @Override + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { + if (msg.hasDeviceInfo()) { + UUID sessionId = UUID.randomUUID(); + DeviceInfoProto deviceInfoProto = msg.getDeviceInfo(); + SessionInfoProto sessionInfo = SessionInfoProto.newBuilder() + .setNodeId(transportContext.getNodeId()) + .setTenantIdMSB(deviceInfoProto.getTenantIdMSB()) + .setTenantIdLSB(deviceInfoProto.getTenantIdLSB()) + .setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB()) + .setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB()) + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .build(); + onSuccess.accept(sessionInfo); + } else { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); + } + } + + @Override + public void onError(Throwable e) { + log.warn("Failed to process request", e); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + } + + private static class SessionCloseOnErrorCallback implements TransportServiceCallback { + private final TransportService transportService; + private final SessionInfoProto sessionInfo; + + SessionCloseOnErrorCallback(TransportService transportService, SessionInfoProto sessionInfo) { + this.transportService = transportService; + this.sessionInfo = sessionInfo; + } + + @Override + public void onSuccess(Void msg) { + + } + + @Override + public void onError(Throwable e) { + transportService.deregisterSession(sessionInfo); + } + } + + private static class HttpSessionListener implements SessionMsgListener { + + private final TransportContext transportContext; + private final DeferredResult responseWriter; + + HttpSessionListener(TransportContext transportContext, DeferredResult responseWriter) { + this.transportContext = transportContext; + this.responseWriter = responseWriter; + } + + @Override + public void onGetAttributesResponse(GetAttributeResponseMsg msg) { + responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); + } + + @Override + public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) { + responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); + } + + @Override + public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) { + } + + @Override + public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { + + } + + @Override + public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) { + + } + } } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java new file mode 100644 index 0000000000..87db06e66b --- /dev/null +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.http; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.transport.TransportContext; + +/** + * Created by ashvayka on 04.10.18. + */ +@Slf4j +@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true) +@Component +public class HttpTransportContext extends TransportContext { + + @Getter + @Value("${http.request_timeout}") + private long defaultTimeout; + +} diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java similarity index 100% rename from transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java rename to common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 9ae42e8ca3..16f8981d57 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import org.thingsboard.server.kafka.TbNodeIdProvider; @@ -40,48 +41,21 @@ import java.util.concurrent.Executors; @Slf4j @ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true) @Component -@Data -public class MqttTransportContext { - - private final ObjectMapper mapper = new ObjectMapper(); - - @Autowired - private TransportService transportService; +public class MqttTransportContext extends TransportContext { + @Getter @Autowired(required = false) private MqttSslHandlerProvider sslHandlerProvider; - @Autowired(required = false) - private HostRequestsQuotaService quotaService; - + @Getter @Autowired private MqttTransportAdaptor adaptor; - @Autowired - private TbNodeIdProvider nodeIdProvider; - + @Getter @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; - + @Getter private SslHandler sslHandler; - @Getter - private ExecutorService executor; - - @PostConstruct - public void init() { - executor = Executors.newCachedThreadPool(); - } - - @PreDestroy - public void stop() { - if (executor != null) { - executor.shutdownNow(); - } - } - - public String getNodeId() { - return nodeIdProvider.getNodeId(); - } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 6a5597bb11..30cc9c75a2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) .build(); transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null); - transportService.registerSession(sessionInfo, this); + transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this); checkGatewaySession(); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index f233695fb1..43f997830f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -121,7 +121,7 @@ public class GatewaySessionHandler { transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); - transportService.registerSession(deviceSessionInfo, deviceSessionCtx); + transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx); } future.set(devices.get(deviceName)); } diff --git a/common/transport/pom.xml b/common/transport/pom.xml index 69f64e6307..c9b47e876f 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -37,7 +37,7 @@ transport-api mqtt - + http diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java new file mode 100644 index 0000000000..f846cf0c38 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java @@ -0,0 +1,53 @@ +package org.thingsboard.server.common.transport; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; +import org.thingsboard.server.kafka.TbNodeIdProvider; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by ashvayka on 15.10.18. + */ +@Slf4j +@Data +public class TransportContext { + + protected final ObjectMapper mapper = new ObjectMapper(); + + @Autowired + private TransportService transportService; + + @Autowired + private TbNodeIdProvider nodeIdProvider; + + @Autowired(required = false) + private HostRequestsQuotaService quotaService; + + @Getter + private ExecutorService executor; + + @PostConstruct + public void init() { + executor = Executors.newCachedThreadPool(); + } + + @PreDestroy + public void stop() { + if (executor != null) { + executor.shutdownNow(); + } + } + + public String getNodeId() { + return nodeIdProvider.getNodeId(); + } + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index c9a04c4daf..9bbc40c1b6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -59,7 +59,7 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); - void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener); + void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener); void deregisterSession(SessionInfoProto sessionInfo); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index ab52d41366..eb6dbc5cb2 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -85,7 +85,7 @@ public class RemoteTransportService implements TransportService { @Value("${kafka.transport_api.response_auto_commit_interval}") private int autoCommitInterval; - private ConcurrentMap sessions = new ConcurrentHashMap<>(); + private ConcurrentMap sessions = new ConcurrentHashMap<>(); @Autowired private TbKafkaSettings kafkaSettings; @@ -159,8 +159,9 @@ public class RemoteTransportService implements TransportService { if (toTransportMsg.hasToDeviceSessionMsg()) { DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg(); UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); - SessionMsgListener listener = sessions.get(sessionId); - if (listener != null) { + SessionMetaData md = sessions.get(sessionId); + if (md != null) { + SessionMsgListener listener = md.getListener(); transportCallbackExecutor.submit(() -> { if (toSessionMsg.hasGetAttributesResponse()) { listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); @@ -178,6 +179,9 @@ public class RemoteTransportService implements TransportService { listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); } }); + if (md.getSessionType() == SessionType.SYNC) { + deregisterSession(md.getSessionInfo()); + } } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); @@ -311,8 +315,8 @@ public class RemoteTransportService implements TransportService { } @Override - public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { - sessions.putIfAbsent(toId(sessionInfo), listener); + public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) { + sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener)); //TODO: monitor sessions periodically: PING REQ/RESP, etc. } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java new file mode 100644 index 0000000000..dd52f77ca6 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -0,0 +1,17 @@ +package org.thingsboard.server.common.transport.service; + +import lombok.Data; +import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.gen.transport.TransportProtos; + +/** + * Created by ashvayka on 15.10.18. + */ +@Data +public class SessionMetaData { + + private final TransportProtos.SessionInfoProto sessionInfo; + private final TransportProtos.SessionType sessionType; + private final SessionMsgListener listener; + +} diff --git a/pom.xml b/pom.xml index e999a4c3f8..f4c2957b08 100755 --- a/pom.xml +++ b/pom.xml @@ -365,7 +365,7 @@ ${project.version} - org.thingsboard.transport + org.thingsboard.common.transport http ${project.version} diff --git a/transport/pom.xml b/transport/pom.xml index 9a7c2e4690..96a5a08304 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -34,7 +34,6 @@ - http mqtt-transport