diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index d033774e93..49f2f1cc63 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -33,6 +33,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import jakarta.annotation.Nullable; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -62,7 +63,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; -import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -99,6 +99,7 @@ public abstract class AbstractGatewaySessionHandler deviceCreationLockMap; private final ConcurrentMap devices; @@ -193,10 +194,6 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : jsonObj.entrySet()) { - String deviceName = deviceEntry.getKey(); + if (!json.isJsonObject()) { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); + } + + for (Map.Entry deviceEntry : json.getAsJsonObject().entrySet()) { + if (!deviceEntry.getValue().isJsonArray()) { + log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); + continue; + } + + String deviceName = deviceEntry.getKey(); + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); + } else { Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { @Override public void onSuccess(@Nullable T deviceCtx) { - if (!deviceEntry.getValue().isJsonArray()) { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - try { - TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray()); - processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e); - channel.close(); - } + processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); } @Override @@ -394,8 +393,16 @@ public abstract class AbstractGatewaySessionHandler deviceMsgList = telemetryMsgProto.getMsgList(); - if (!CollectionUtils.isEmpty(deviceMsgList)) { - deviceMsgList.forEach(telemetryMsg -> { - String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg(); - try { - TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); - processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - channel.close(); - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - }); - } else { + if (CollectionUtils.isEmpty(deviceMsgList)) { log.debug("[{}][{}][{}] Devices telemetry messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]"); } + + deviceMsgList.forEach(telemetryMsg -> { + String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - public void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { - transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); + protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) { + try { + TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); + channel.close(); + } } public TransportProtos.PostTelemetryMsg postTelemetryMsgCreated(TransportProtos.KeyValueProto keyValueProto, long ts) { @@ -452,148 +462,165 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : jsonObj.entrySet()) { - String deviceName = deviceEntry.getKey(); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - if (!deviceEntry.getValue().isJsonObject()) { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - try { - DeviceId deviceId = deviceCtx.getDeviceId(); - TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue()); - processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e); - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } - } else { + if (!json.isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + if (!deviceEntry.getValue().isJsonObject()) { + log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); + continue; + } + + String deviceName = deviceEntry.getKey(); + T deviceCtx = devices.get(deviceEntry.getKey()); + + if (deviceCtx != null) { + processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + } + } + + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) { + try { + DeviceId deviceId = deviceCtx.getDeviceId(); + TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); + } } private void onDeviceClaimProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayClaimMsg claimMsgProto = TransportApiProtos.GatewayClaimMsg.parseFrom(getBytes(payload)); List claimMsgList = claimMsgProto.getMsgList(); - if (!CollectionUtils.isEmpty(claimMsgList)) { - claimMsgList.forEach(claimDeviceMsg -> { - String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest(); - if (claimRequest == null) { - throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!"); - } - try { - DeviceId deviceId = deviceCtx.getDeviceId(); - TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray()); - processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - }); - } else { + if (CollectionUtils.isEmpty(claimMsgList)) { log.debug("[{}][{}][{}] Devices claim messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]"); } + + claimMsgList.forEach(claimDeviceMsg -> { + String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { - transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) { + try { + DeviceId deviceId = deviceCtx.getDeviceId(); + TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray()); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); + } } private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException { JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); - if (json.isJsonObject()) { - JsonObject jsonObj = json.getAsJsonObject(); - for (Map.Entry deviceEntry : jsonObj.entrySet()) { - String deviceName = deviceEntry.getKey(); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - if (!deviceEntry.getValue().isJsonObject()) { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject()); - processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } - } else { + if (!json.isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + if (!deviceEntry.getValue().isJsonObject()) { + log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); + continue; + } + String deviceName = deviceEntry.getKey(); + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } } private void onDeviceAttributesProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayAttributesMsg attributesMsgProto = TransportApiProtos.GatewayAttributesMsg.parseFrom(getBytes(payload)); List attributesMsgList = attributesMsgProto.getMsgList(); - if (!CollectionUtils.isEmpty(attributesMsgList)) { - attributesMsgList.forEach(attributesMsg -> { - String deviceName = checkDeviceName(attributesMsg.getDeviceName()); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); - if (kvListProto == null) { - throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!"); - } - try { - TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e); - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - }); - } else { + if (CollectionUtils.isEmpty(attributesMsgList)) { log.debug("[{}][{}][{}] Devices attributes keys list is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]"); } + + attributesMsgList.forEach(attributesMsg -> { + String deviceName = checkDeviceName(attributesMsg.getDeviceName()); + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { + try { + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); + } + } + + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { + try { + TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e); + } } private void onDeviceAttributesRequestJson(MqttPublishMessage msg) throws AdaptorException { @@ -637,74 +664,76 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - Integer requestId = jsonObj.get("id").getAsInt(); - String data = jsonObj.get("data").toString(); - TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() - .setRequestId(requestId).setPayload(data).build(); - processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } else { + if (!json.isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } + JsonObject jsonObj = json.getAsJsonObject(); + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); + Integer requestId = jsonObj.get("id").getAsInt(); + String data = jsonObj.get("data").toString(); + onDeviceRpcResponse(requestId, data, deviceName, msgId); } private void onDeviceRpcResponseProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload)); String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName()); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - Integer requestId = gatewayRpcResponseMsg.getId(); - String data = gatewayRpcResponseMsg.getData(); - TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() - .setRequestId(requestId).setPayload(data).build(); - processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); + Integer requestId = gatewayRpcResponseMsg.getId(); + String data = gatewayRpcResponseMsg.getData(); + onDeviceRpcResponse(requestId, data, deviceName, msgId); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { + private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) { + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId); + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + } + + private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() + .setRequestId(requestId).setPayload(data).build(); transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); } private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { int msgId = getMsgId(mqttMsg); - Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); - } + T deviceCtx = devices.get(deviceName); + if (deviceCtx != null) { + processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId); + } else { + Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable T deviceCtx) { + processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId); + } - @Override - public void onFailure(Throwable t) { - ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC); - log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); + @Override + public void onFailure(Throwable t) { + ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC); + log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); + } + }, context.getExecutor()); + } + } + + private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { + transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); } private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set keys) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 1f389fced1..cbc515c1d0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -26,6 +26,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.springframework.util.CollectionUtils; @@ -44,6 +45,7 @@ import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import jakarta.annotation.Nullable; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -68,6 +70,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi @Slf4j public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { + @Getter private final SparkplugTopic sparkplugTopicNode; private final Map nodeBirthMetrics; private final MqttTransportHandler parent; @@ -136,41 +139,36 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler contextListenableFuture, int msgId, List postTelemetryMsgList, String deviceName) throws AdaptorException { - try { - int finalMsgId = msgId; - postTelemetryMsgList.forEach(telemetryMsg -> { - Futures.addCallback(contextListenableFuture, - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { - try { - processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId); - } catch (Throwable e) { - log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); - channel.close(); - } - } + Futures.addCallback(contextListenableFuture, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { + for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { + try { + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); + } catch (Throwable e) { + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); + channel.close(); + break; + } + } + } - @Override - public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); - } - }, context.getExecutor()); - }); - } catch (RuntimeException e) { - throw new AdaptorException(e); - } + @Override + public void onFailure(Throwable t) { + log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); + } + }, context.getExecutor()); } private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, List attributesMsgList, String deviceName) throws AdaptorException { try { if (!CollectionUtils.isEmpty(attributesMsgList)) { - attributesMsgList.forEach(attributesMsg -> { - Futures.addCallback(contextListenableFuture, - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { + Futures.addCallback(contextListenableFuture, + new FutureCallback<>() { + @Override + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { + for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); @@ -179,13 +177,13 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler grantedQoSList, MqttTopicSubscription subscription, - MqttQoS reqQoS) throws ThingsboardException, AdaptorException, - ExecutionException, InterruptedException { - SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicName()); + MqttQoS reqQoS) throws ThingsboardException { + SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicFilter()); if (sparkplugTopic.getGroupId() == null) { // TODO SUBSCRIBE NameSpace } else if (sparkplugTopic.getType() == null) { @@ -304,10 +301,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler createSparkplugMqttPublishMsg(TransportProtos.TsKvProto tsKvProto, String sparkplugTopic, SparkplugBProto.Payload.Metric metricBirth) { @@ -328,7 +321,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler