diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 0822766b4b..7abcca2cab 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -216,12 +216,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { TransportToDeviceActorMsg msg = wrapper.getMsg(); -// processSubscriptionCommands(context, msg); // processRpcResponses(context, msg); if (msg.hasSessionEvent()) { processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); } - + if (msg.hasSubscribeToAttributes()) { + processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes()); + } + if (msg.hasSubscribeToRPC()) { + processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC()); + } if (msg.hasPostAttributes()) { handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); reportActivity(); @@ -236,9 +240,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso // SessionMsgType sessionMsgType = msg.getPayload().getMsgType(); // if (sessionMsgType.requiresRulesProcessing()) { // switch (sessionMsgType) { -// case GET_ATTRIBUTES_REQUEST: -// handleGetAttributesRequest(msg); -// break; // case TO_SERVER_RPC_REQUEST: // handleClientSideRPCRequest(context, msg); // reportActivity(); @@ -262,7 +263,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { ListenableFuture> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList())); ListenableFuture> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList())); - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); int requestId = request.getRequestId(); Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback>>() { @Override @@ -272,7 +272,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso .addAllClientAttributeList(toTsKvProtos(result.get(0))) .addAllSharedAttributeList(toTsKvProtos(result.get(1))) .build(); - sendToTransport(responseMsg, sessionId, sessionInfo); + sendToTransport(responseMsg, sessionInfo); } @Override @@ -280,7 +280,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() .setError(t.getMessage()) .build(); - sendToTransport(responseMsg, sessionId, sessionInfo); + sendToTransport(responseMsg, sessionInfo); } }); } @@ -353,28 +353,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) { if (attributeSubscriptions.size() > 0) { - ToDeviceMsg notification = null; + boolean hasNotificationData = false; + AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder(); if (msg.isDeleted()) { - List sharedKeys = msg.getDeletedKeys().stream() + List sharedKeys = msg.getDeletedKeys().stream() .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope())) + .map(AttributeKey::getAttributeKey) .collect(Collectors.toList()); - notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromDeleted(sharedKeys)); + if (!sharedKeys.isEmpty()) { + notification.addAllSharedDeleted(sharedKeys); + hasNotificationData = true; + } } else { if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) { List attributes = new ArrayList<>(msg.getValues()); if (attributes.size() > 0) { - notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromShared(attributes)); + List sharedUpdated = msg.getValues().stream().map(this::toTsKvProto) + .collect(Collectors.toList()); + if (!sharedUpdated.isEmpty()) { + notification.addAllSharedUpdated(sharedUpdated); + hasNotificationData = true; + } } else { logger.debug("[{}] No public server side attributes changed!", deviceId); } } } - if (notification != null) { - ToDeviceMsg finalNotification = notification; -// attributeSubscriptions.entrySet().forEach(sub -> { -// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey()); -// sendMsgToSessionActor(response, sub.getValue().getServer()); -// }); + if (hasNotificationData) { + AttributeUpdateNotificationMsg finalNotification = notification.build(); + attributeSubscriptions.entrySet().forEach(sub -> { + sendToTransport(finalNotification, sub.getKey(), sub.getValue()); + }); } } else { logger.debug("[{}] No registered attributes subscriptions to process!", deviceId); @@ -414,25 +423,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso // } } -// private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) { -// SessionId sessionId = msg.getSessionId(); -// SessionType sessionType = msg.getSessionType(); -// FromDeviceMsg inMsg = msg.getPayload(); -// if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) { -// logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); -// attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress())); -// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) { -// logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); -// attributeSubscriptions.remove(sessionId); -// } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) { -// logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType); -// rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress())); -// sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress()); -// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) { -// logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType); -// rpcSubscriptions.remove(sessionId); -// } -// } + private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + if (subscribeCmd.getUnsubscribe()) { + logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); + attributeSubscriptions.remove(sessionId); + } else { + SessionInfo session = sessions.get(sessionId); + if (session == null) { + session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); + } + logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); + attributeSubscriptions.put(sessionId, session); + } + } + + private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + if (subscribeCmd.getUnsubscribe()) { + logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); + rpcSubscriptions.remove(sessionId); + } else { + SessionInfo session = sessions.get(sessionId); + if (session == null) { + session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); + } + logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); + rpcSubscriptions.put(sessionId, session); + } + } private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); @@ -521,11 +540,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) { + private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) { + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() + .setSessionIdMSB(sessionInfo.getSessionIdMSB()) + .setSessionIdLSB(sessionInfo.getSessionIdLSB()) + .setGetAttributesResponse(responseMsg).build(); + systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); + } + + private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) { DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setGetAttributesResponse(responseMsg).build(); + .setAttributeUpdateNotification(notificationMsg).build(); systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java index 32cb60da5f..43ae592cee 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java @@ -25,5 +25,4 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; public class SessionInfo { private final SessionType type; private final String nodeId; - } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java index 44fcd2a39d..17d4a1f42c 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; /** @@ -23,4 +24,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse public interface SessionMsgListener { void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); + + void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java index c3817a9ffa..bf33b1d8a5 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.transport; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; @@ -43,6 +45,10 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback); + + void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback); + void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener); void deregisterSession(SessionInfoProto sessionInfo); diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index 6bb8eff1be..872dfaa03e 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg; import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.*; import java.util.ArrayList; @@ -153,20 +154,6 @@ public class JsonConverter { return result; } - private static void parseNumericProto(List result, Entry valueEntry, JsonPrimitive value) { - if (value.getAsString().contains(".")) { - result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble())); - } else { - try { - long longValue = Long.parseLong(value.getAsString()); - result.add(new LongDataEntry(valueEntry.getKey(), longValue)); - } catch (NumberFormatException e) { - throw new JsonSyntaxException("Big integer values are not supported!"); - } - } - } - - private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException { BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); if (jsonObject.isJsonObject()) { @@ -283,6 +270,19 @@ public class JsonConverter { return result; } + public static JsonElement toJson(AttributeUpdateNotificationMsg payload) { + JsonObject result = new JsonObject(); + if (payload.getSharedUpdatedCount() > 0) { + payload.getSharedUpdatedList().forEach(addToObjectFromProto(result)); + } + if (payload.getSharedDeletedCount() > 0) { + JsonArray attrObject = new JsonArray(); + payload.getSharedDeletedList().forEach(attrObject::add); + result.add("deleted", attrObject); + } + return result; + } + public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) { JsonObject result = new JsonObject(); if (asMap) { @@ -377,4 +377,5 @@ public class JsonConverter { error.addProperty("error", errorMsg); return error; } + } diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index d32bac99a7..66c0d8183a 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -108,6 +108,11 @@ message GetAttributeResponseMsg { string error = 5; } +message AttributeUpdateNotificationMsg { + repeated TsKvProto sharedUpdated = 1; + repeated string sharedDeleted = 2; +} + message ValidateDeviceTokenRequestMsg { string token = 1; } @@ -124,12 +129,22 @@ message SessionCloseNotificationProto { string message = 1; } +message SubscribeToAttributeUpdatesMsg { + bool unsubscribe = 1; +} + +message SubscribeToRPCMsg { + bool unsubscribe = 1; +} + message TransportToDeviceActorMsg { SessionInfoProto sessionInfo = 1; SessionEventMsg sessionEvent = 2; PostTelemetryMsg postTelemetry = 3; PostAttributeMsg postAttributes = 4; GetAttributeRequestMsg getAttributes = 5; + SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6; + SubscribeToRPCMsg subscribeToRPC= 7; } message DeviceActorToTransportMsg { @@ -137,6 +152,7 @@ message DeviceActorToTransportMsg { int64 sessionIdLSB = 2; SessionCloseNotificationProto sessionCloseNotification = 3; GetAttributeResponseMsg getAttributesResponse = 4; + AttributeUpdateNotificationMsg attributeUpdateNotification = 5; } /** diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index c64ab5d559..b1ee037dc3 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -71,6 +71,7 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK; +import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP; import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK; @@ -148,16 +149,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; -// case PINGREQ: -// if (checkConnected(ctx)) { -// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); -// } -// break; -// case DISCONNECT: -// if (checkConnected(ctx)) { -// processDisconnect(ctx); -// } -// break; + case PINGREQ: + //TODO: should we push the notification to the rule engine? + if (checkConnected(ctx)) { + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); + } + break; + case DISCONNECT: + if (checkConnected(ctx)) { + processDisconnect(ctx); + } + break; default: break; } @@ -289,25 +291,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement MqttQoS reqQoS = subscription.qualityOfService(); try { switch (topic) { -// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); -// registerSubQoS(topic, grantedQoSList, reqQoS); -// break; -// } -// case DEVICE_RPC_REQUESTS_SUB_TOPIC: { -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); -// registerSubQoS(topic, grantedQoSList, reqQoS); -// break; -// } -// case DEVICE_RPC_RESPONSE_SUB_TOPIC: -// case GATEWAY_ATTRIBUTES_TOPIC: -// case GATEWAY_RPC_TOPIC: -// registerSubQoS(topic, grantedQoSList, reqQoS); -// break; + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { + transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { + transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + } + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: + case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - deviceSessionCtx.setAllowAttributeResponses(); registerSubQoS(topic, grantedQoSList, reqQoS); break; default: @@ -337,19 +334,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement mqttQoSMap.remove(topicName); try { switch (topicName) { -// case DEVICE_ATTRIBUTES_TOPIC: { -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); -// break; -// } -// case DEVICE_RPC_REQUESTS_SUB_TOPIC: { -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); -// break; -// } - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - deviceSessionCtx.setDisallowAttributeResponses(); + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { + transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null); break; + } + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { + transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null); + break; + } } } catch (Exception e) { log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); @@ -551,7 +543,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } catch (Exception e) { - log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e); + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); + } + } + + @Override + public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) { + try { + adaptor.convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } catch (Exception e) { + log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e); } } } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index ee41d2c3d6..d4ab033cee 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -113,6 +113,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } + @Override + public Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException { + return Optional.of(createMqttPublishMsg(ctx, + MqttTopics.DEVICE_ATTRIBUTES_TOPIC, + JsonConverter.toJson(notificationMsg))); + } + @Override public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException { FromDeviceMsg msg; diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 54e7c5ca3c..602fde12f6 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -36,4 +36,8 @@ public interface MqttTransportAdaptor extends TransportAdaptor convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException; + + Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; + + } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index d704634ded..6bbb9fa6fd 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -44,7 +44,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private final MqttSessionId sessionId; @Getter private ChannelHandlerContext channel; - private volatile boolean allowAttributeResponses; private AtomicInteger msgIdSeq = new AtomicInteger(0); public DeviceSessionCtx(ConcurrentMap mqttQoSMap) { @@ -103,14 +102,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { this.channel = channel; } - public void setAllowAttributeResponses() { - allowAttributeResponses = true; - } - - public void setDisallowAttributeResponses() { - allowAttributeResponses = false; - } - public int nextMsgId() { return msgIdSeq.incrementAndGet(); } diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java index 7a2f698647..648b01c9c3 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -161,6 +161,9 @@ public class MqttTransportService implements TransportService { if (toSessionMsg.hasGetAttributesResponse()) { listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); } + if (toSessionMsg.hasAttributeUpdateNotification()) { + listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); + } }); } else { //TODO: should we notify the device actor about missed session? @@ -251,6 +254,24 @@ public class MqttTransportService implements TransportService { send(sessionInfo, toRuleEngineMsg, callback); } + @Override + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToAttributes(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + + @Override + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToRPC(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + @Override public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { sessions.putIfAbsent(toId(sessionInfo), listener);