diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index ed73b54ea1..f7ff7ec1e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -283,13 +283,6 @@ public class DefaultTransportApiService implements TransportApiService { Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); deviceCreationLock.lock(); try { - DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); - boolean isSparkplug = false; - if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration && - ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug()) { - isSparkplug = true; - } Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); if (device == null) { TenantId tenantId = gateway.getTenantId(); @@ -298,6 +291,7 @@ public class DefaultTransportApiService implements TransportApiService { device.setName(requestMsg.getDeviceName()); device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId()); + DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); device.setDeviceProfileId(deviceProfile.getId()); ObjectNode additionalInfo = JacksonUtil.newObjectNode(); @@ -314,7 +308,7 @@ public class DefaultTransportApiService implements TransportApiService { if (customerId != null && !customerId.isNullUid()) { metaData.putValue("customerId", customerId.toString()); } - String deviceIdStr = isSparkplug ? "sparkplugId" : "gatewayId"; + String deviceIdStr = requestMsg.getSparkplug() ? "sparkplugId" : "gatewayId"; metaData.putValue(deviceIdStr, gatewayId.toString()); DeviceId deviceId = device.getId(); @@ -326,7 +320,7 @@ public class DefaultTransportApiService implements TransportApiService { if (deviceAdditionalInfo == null) { deviceAdditionalInfo = JacksonUtil.newObjectNode(); } - String lastConnectedStr = isSparkplug ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY; + String lastConnectedStr = requestMsg.getSparkplug() ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY; if (deviceAdditionalInfo.isObject() && (!deviceAdditionalInfo.has(lastConnectedStr) || !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) { @@ -338,7 +332,12 @@ public class DefaultTransportApiService implements TransportApiService { } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() .setDeviceInfo(getDeviceInfoProto(device)); - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); + if (deviceProfile != null) { + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + } else { + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); + } return TransportApiResponseMsg.newBuilder() .setGetOrCreateDeviceResponseMsg(builder.build()) .build(); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5a28d42546..309d812b41 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -186,6 +186,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg { int64 gatewayIdLSB = 2; string deviceName = 3; string deviceType = 4; + bool sparkplug = 5; } message GetOrCreateDeviceFromGatewayResponseMsg { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index bf1a996170..4d98662be2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -132,7 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement final DeviceSessionCtx deviceSessionCtx; volatile InetSocketAddress address; volatile GatewaySessionHandler gatewaySessionHandler; - volatile SparkplugNodeSessionHandler sparkPlugSessionHandler; + volatile SparkplugNodeSessionHandler sparkplugSessionHandler; private final ConcurrentHashMap otaPackSessions; private final ConcurrentHashMap chunkSizes; @@ -325,14 +325,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement int msgId = mqttMsg.variableHeader().packetId(); log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId); - if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { + if (sparkplugSessionHandler != null) { + handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg); + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); + } else if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { if (gatewaySessionHandler != null) { handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } - } else if (sparkPlugSessionHandler != null) { - handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg); - transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } else { processDevicePublish(ctx, mqttMsg, topicName, msgId); } @@ -376,7 +376,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) { try { - sparkPlugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg); + sparkplugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg); } catch (RuntimeException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); ctx.close(); @@ -643,9 +643,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); try { - if (sparkPlugSessionHandler != null) { + if (sparkplugSessionHandler != null) { SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName()); - sparkPlugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS); + sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS); } else { switch (topic) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { @@ -978,14 +978,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void checkSparkPlugSession(MqttConnectMessage connectMessage) { + private void checkSparkplugSession(MqttConnectMessage connectMessage) { try { SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic()); // Test proto SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); // - if (sparkPlugSessionHandler == null) { - sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString()); + if (sparkplugSessionHandler == null) { + sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString()); } else { log.warn("SparkPlugNodeReConnected [{}] [{}]", sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); } @@ -1029,7 +1029,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); if (deviceSessionCtx.isSparkplug()) { - checkSparkPlugSession(connectMessage); + checkSparkplugSession(connectMessage); } else { checkGatewaySession(sessionMetaData); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index d2787581af..f63b899515 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -360,6 +360,7 @@ public class SparkplugNodeSessionHandler { .setDeviceType(deviceType) .setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) .setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) + .setSparkplug(true) .build(), new TransportServiceCallback<>() { @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index e69248dd30..1ec98aac9f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -87,11 +87,10 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public boolean isSparkplug() { DeviceProfileTransportConfiguration transportConfiguration = this.deviceProfile.getProfileData().getTransportConfiguration(); if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) { - if (((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug()) { - return true; - } + return ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug(); + } else { + return false; } - return false; } }