From c6800dbd032b3666ae6eb2ab5c061dc6e5a2e39c Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 12 Oct 2018 13:35:26 +0300 Subject: [PATCH] BugFix for Gateway API and concurrent device creation --- application/pom.xml | 16 +- .../device/DeviceActorMessageProcessor.java | 4 + .../transport/RemoteTransportApiService.java | 28 +- .../transport/http/DeviceApiController.java | 32 +-- .../http/session/HttpSessionCtx.java | 246 +++++++++--------- .../transport/http/session/HttpSessionId.java | 35 --- .../transport/mqtt/MqttTransportHandler.java | 24 +- .../mqtt/session/GatewayDeviceSessionCtx.java | 4 +- ...ionCtx.java => GatewaySessionHandler.java} | 15 +- transport/pom.xml | 2 +- 10 files changed, 197 insertions(+), 209 deletions(-) delete mode 100644 transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionId.java rename transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/{GatewaySessionCtx.java => GatewaySessionHandler.java} (96%) diff --git a/application/pom.xml b/application/pom.xml index bff96ec6fd..dd1bade336 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -60,14 +60,14 @@ org.thingsboard.common transport - - org.thingsboard.transport - http - - - org.thingsboard.transport - coap - + + + + + + + + org.thingsboard.transport mqtt-common diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 64480863f5..05902a6146 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -455,6 +455,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { UUID sessionId = getSessionId(sessionInfo); if (msg.getEvent() == SessionEvent.OPEN) { + if(sessions.containsKey(sessionId)){ + logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId); + return; + } logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 945317f768..66a8226a98 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -51,6 +51,7 @@ import javax.annotation.PostConstruct; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; /** * Created by ashvayka on 05.10.18. @@ -97,6 +98,8 @@ public class RemoteTransportApiService implements TransportApiService { private TbKafkaResponseTemplate transportApiTemplate; + private ReentrantLock deviceCreationLock = new ReentrantLock(); + @PostConstruct public void init() { this.transportCallbackExecutor = Executors.newCachedThreadPool(); @@ -156,23 +159,26 @@ public class RemoteTransportApiService implements TransportApiService { DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB())); ListenableFuture gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId); return Futures.transform(gatewayFuture, gateway -> { - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), gateway.getName()); - if (device == null) { - device = new Device(); - device.setTenantId(gateway.getTenantId()); - device.setName(requestMsg.getDeviceName()); - device.setType(requestMsg.getDeviceType()); - device.setCustomerId(gateway.getCustomerId()); - device = deviceService.saveDevice(device); - relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); - deviceStateService.onDeviceAdded(device); - } + deviceCreationLock.lock(); try { + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); + if (device == null) { + device = new Device(); + device.setTenantId(gateway.getTenantId()); + device.setName(requestMsg.getDeviceName()); + device.setType(requestMsg.getDeviceType()); + device.setCustomerId(gateway.getCustomerId()); + device = deviceService.saveDevice(device); + relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); + deviceStateService.onDeviceAdded(device); + } return TransportApiResponseMsg.newBuilder() .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build(); } catch (JsonProcessingException e) { log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e); throw new RuntimeException(e); + } finally { + deviceCreationLock.unlock(); } }, transportCallbackExecutor); } diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index bc7eeff6f7..3c7c521b55 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -122,7 +122,8 @@ public class DeviceApiController { @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, HttpServletRequest request) { - return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); +// return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); + return null; } @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) @@ -174,15 +175,15 @@ public class DeviceApiController { public DeferredResult subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, HttpServletRequest httpRequest) { - - return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); + return null; +// return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); } - private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { - DeferredResult responseWriter = new DeferredResult(); - if (quotaExceeded(httpRequest, responseWriter)) { - return responseWriter; - } +// private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { +// DeferredResult responseWriter = new DeferredResult(); +// if (quotaExceeded(httpRequest, responseWriter)) { +// return responseWriter; +// } // HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); // if (ctx.login(new DeviceTokenCredentials(deviceToken))) { // try { @@ -193,21 +194,22 @@ public class DeviceApiController { // } else { // responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); // } - return responseWriter; - } +// return responseWriter; +// } private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter) { return getHttpSessionCtx(responseWriter, defaultTimeout); } private HttpSessionCtx getHttpSessionCtx(DeferredResult responseWriter, long timeout) { - return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout); + return null; +// return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout); } - private void process(HttpSessionCtx ctx, FromDeviceMsg request) { - AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request); -// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); - } +// private void process(HttpSessionCtx ctx, FromDeviceMsg request) { +// AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request); +//// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); +// } private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java index 62999cc64b..0e087d4966 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import java.util.Optional; +import java.util.UUID; import java.util.function.Consumer; /** @@ -37,127 +38,136 @@ import java.util.function.Consumer; @Slf4j public class HttpSessionCtx extends DeviceAwareSessionContext { - private final SessionId sessionId; - private final long timeout; - private final DeferredResult responseWriter; - - public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult responseWriter, long timeout) { - super(); - this.sessionId = new HttpSessionId(); - this.responseWriter = responseWriter; - this.timeout = timeout; + public HttpSessionCtx(UUID sessionId) { + super(sessionId); } @Override - public SessionType getSessionType() { - return SessionType.SYNC; + public int nextMsgId() { + return 0; } - @Override - public void onMsg(SessionActorToAdaptorMsg source) throws SessionException { - ToDeviceMsg msg = source.getMsg(); - switch (msg.getSessionMsgType()) { - case GET_ATTRIBUTES_RESPONSE: - reply((GetAttributesResponse) msg); - return; - case STATUS_CODE_RESPONSE: - reply((StatusCodeResponse) msg); - return; - case ATTRIBUTES_UPDATE_NOTIFICATION: - reply((AttributesUpdateNotification) msg); - return; - case TO_DEVICE_RPC_REQUEST: - reply((ToDeviceRpcRequestMsg) msg); - return; - case TO_SERVER_RPC_RESPONSE: - reply((ToServerRpcResponseMsg) msg); - return; - case RULE_ENGINE_ERROR: - reply((RuleEngineErrorMsg) msg); - return; - default: - break; - } - } - - private void reply(RuleEngineErrorMsg msg) { - HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR; - switch (msg.getError()) { - case QUEUE_PUT_TIMEOUT: - status = HttpStatus.REQUEST_TIMEOUT; - break; - default: - if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) { - status = HttpStatus.BAD_REQUEST; - } - break; - } - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status)); - } - - private void reply(ResponseMsg msg, Consumer f) { - Optional msgError = msg.getError(); - if (!msgError.isPresent()) { - Optional msgData = msg.getData(); - if (msgData.isPresent()) { - f.accept(msgData.get()); - } - } else { - Exception e = msgError.get(); - responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR)); - } - } - - private void reply(ToDeviceRpcRequestMsg msg) { - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); - } - - private void reply(ToServerRpcResponseMsg msg) { -// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); - } - - private void reply(AttributesUpdateNotification msg) { - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK)); - } - - private void reply(GetAttributesResponse msg) { - reply(msg, payload -> { - if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); - } else { - JsonObject result = JsonConverter.toJson(payload, false); - responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK)); - } - }); - } - - private void reply(StatusCodeResponse msg) { - reply(msg, payload -> { - if (payload == 0) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload))); - } - }); - } - - @Override - public void onMsg(SessionCtrlMsg msg) throws SessionException { - //Do nothing - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public long getTimeout() { - return timeout; - } - - @Override - public SessionId getSessionId() { - return sessionId; - } + // private final SessionId sessionId; +// private final long timeout; +// private final DeferredResult responseWriter; +// +// public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult responseWriter, long timeout) { +// super(); +// this.sessionId = new HttpSessionId(); +// this.responseWriter = responseWriter; +// this.timeout = timeout; +// } +// +// @Override +// public SessionType getSessionType() { +// return SessionType.SYNC; +// } +// +// @Override +// public void onMsg(SessionActorToAdaptorMsg source) throws SessionException { +// ToDeviceMsg msg = source.getMsg(); +// switch (msg.getSessionMsgType()) { +// case GET_ATTRIBUTES_RESPONSE: +// reply((GetAttributesResponse) msg); +// return; +// case STATUS_CODE_RESPONSE: +// reply((StatusCodeResponse) msg); +// return; +// case ATTRIBUTES_UPDATE_NOTIFICATION: +// reply((AttributesUpdateNotification) msg); +// return; +// case TO_DEVICE_RPC_REQUEST: +// reply((ToDeviceRpcRequestMsg) msg); +// return; +// case TO_SERVER_RPC_RESPONSE: +// reply((ToServerRpcResponseMsg) msg); +// return; +// case RULE_ENGINE_ERROR: +// reply((RuleEngineErrorMsg) msg); +// return; +// default: +// break; +// } +// } +// +// private void reply(RuleEngineErrorMsg msg) { +// HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR; +// switch (msg.getError()) { +// case QUEUE_PUT_TIMEOUT: +// status = HttpStatus.REQUEST_TIMEOUT; +// break; +// default: +// if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) { +// status = HttpStatus.BAD_REQUEST; +// } +// break; +// } +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status)); +// } +// +// private void reply(ResponseMsg msg, Consumer f) { +// Optional msgError = msg.getError(); +// if (!msgError.isPresent()) { +// Optional msgData = msg.getData(); +// if (msgData.isPresent()) { +// f.accept(msgData.get()); +// } +// } else { +// Exception e = msgError.get(); +// responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR)); +// } +// } +// +// private void reply(ToDeviceRpcRequestMsg msg) { +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); +// } +// +// private void reply(ToServerRpcResponseMsg msg) { +//// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); +// } +// +// private void reply(AttributesUpdateNotification msg) { +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK)); +// } +// +// private void reply(GetAttributesResponse msg) { +// reply(msg, payload -> { +// if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); +// } else { +// JsonObject result = JsonConverter.toJson(payload, false); +// responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK)); +// } +// }); +// } +// +// private void reply(StatusCodeResponse msg) { +// reply(msg, payload -> { +// if (payload == 0) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload))); +// } +// }); +// } +// +// @Override +// public void onMsg(SessionCtrlMsg msg) throws SessionException { +// //Do nothing +// } +// +// @Override +// public boolean isClosed() { +// return false; +// } +// +// @Override +// public long getTimeout() { +// return timeout; +// } +// +// @Override +// public SessionId getSessionId() { +// return sessionId; +// } } diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionId.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionId.java deleted file mode 100644 index a5426ca051..0000000000 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionId.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.http.session; - -import java.util.UUID; - -/** - * @author Andrew Shvayka - */ -public class HttpSessionId implements SessionId { - - private final UUID id; - - public HttpSessionId() { - this.id = UUID.randomUUID(); - } - - @Override - public String toUidStr() { - return id.toString(); - } -} diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index b570c74ec4..b1b176d032 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -54,7 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; +import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.util.SslUtil; import javax.net.ssl.SSLPeerUnverifiedException; @@ -98,7 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private volatile SessionInfoProto sessionInfo; private volatile InetSocketAddress address; private volatile DeviceSessionCtx deviceSessionCtx; - private volatile GatewaySessionCtx gatewaySessionCtx; + private volatile GatewaySessionHandler gatewaySessionHandler; MqttTransportHandler(MqttTransportContext context) { this.sessionId = UUID.randomUUID(); @@ -175,7 +175,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { - if (gatewaySessionCtx != null) { + if (gatewaySessionHandler != null) { handleGatewayPublishMsg(topicName, msgId, mqttMsg); } } else { @@ -187,22 +187,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { switch (topicName) { case MqttTopics.GATEWAY_TELEMETRY_TOPIC: - gatewaySessionCtx.onDeviceTelemetry(mqttMsg); + gatewaySessionHandler.onDeviceTelemetry(mqttMsg); break; case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: - gatewaySessionCtx.onDeviceAttributes(mqttMsg); + gatewaySessionHandler.onDeviceAttributes(mqttMsg); break; case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC: - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); + gatewaySessionHandler.onDeviceAttributesRequest(mqttMsg); break; case MqttTopics.GATEWAY_RPC_TOPIC: - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); + gatewaySessionHandler.onDeviceRpcResponse(mqttMsg); break; case MqttTopics.GATEWAY_CONNECT_TOPIC: - gatewaySessionCtx.onDeviceConnect(mqttMsg); + gatewaySessionHandler.onDeviceConnect(mqttMsg); break; case MqttTopics.GATEWAY_DISCONNECT_TOPIC: - gatewaySessionCtx.onDeviceDisconnect(mqttMsg); + gatewaySessionHandler.onDeviceDisconnect(mqttMsg); break; } } catch (RuntimeException | AdaptorException e) { @@ -405,8 +405,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (deviceSessionCtx.isConnected()) { transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); transportService.deregisterSession(sessionInfo); - if (gatewaySessionCtx != null) { - gatewaySessionCtx.onGatewayDisconnect(); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onGatewayDisconnect(); } } } @@ -467,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionCtx = new GatewaySessionCtx(context, deviceSessionCtx, sessionId); + gatewaySessionHandler = new GatewaySessionHandler(context, deviceSessionCtx, sessionId); } } } catch (IOException e) { diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 8560b35acf..1e2c2ca71d 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -30,10 +30,10 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { - private final GatewaySessionCtx parent; + private final GatewaySessionHandler parent; private final SessionInfoProto sessionInfo; - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap mqttQoSMap) { + public GatewayDeviceSessionCtx(GatewaySessionHandler parent, DeviceInfoProto deviceInfo, ConcurrentMap mqttQoSMap) { super(UUID.randomUUID(), mqttQoSMap); this.parent = parent; this.sessionInfo = SessionInfoProto.newBuilder() diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java similarity index 96% rename from transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index fd5eeb7876..b1504b1246 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -52,13 +52,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. */ @Slf4j -public class GatewaySessionCtx { +public class GatewaySessionHandler { private static final String DEFAULT_DEVICE_TYPE = "default"; private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; @@ -73,7 +72,7 @@ public class GatewaySessionCtx { private final ChannelHandlerContext channel; private final DeviceSessionCtx deviceSessionCtx; - public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) { + public GatewaySessionHandler(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) { this.context = context; this.transportService = context.getTransportService(); this.deviceSessionCtx = deviceSessionCtx; @@ -114,10 +113,12 @@ public class GatewaySessionCtx { new TransportServiceCallback() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionCtx.this, msg.getDeviceInfo(), mqttQoSMap); + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); transportService.registerSession(deviceSessionInfo, deviceSessionCtx); } future.set(devices.get(deviceName)); @@ -203,7 +204,7 @@ public class GatewaySessionCtx { @Override public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t); + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); } }, context.getExecutor()); } @@ -264,8 +265,8 @@ public class GatewaySessionCtx { } else { result.addAllSharedAttributeNames(keys); } - int msgId = msg.variableHeader().packetId(); TransportProtos.GetAttributeRequestMsg requestMsg = result.build(); + int msgId = msg.variableHeader().packetId(); Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback() { @Override @@ -275,7 +276,7 @@ public class GatewaySessionCtx { @Override public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t); + log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t); } }, context.getExecutor()); ack(msg); diff --git a/transport/pom.xml b/transport/pom.xml index 671e3c1d63..79159885d2 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -35,7 +35,7 @@ - + http mqtt-common mqtt-transport