Refactoring of the initial implementation
This commit is contained in:
		
							parent
							
								
									ca54838f23
								
							
						
					
					
						commit
						38ef27567c
					
				@ -283,13 +283,6 @@ public class DefaultTransportApiService implements TransportApiService {
 | 
			
		||||
            Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
 | 
			
		||||
            deviceCreationLock.lock();
 | 
			
		||||
            try {
 | 
			
		||||
                DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
 | 
			
		||||
                DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
 | 
			
		||||
                boolean isSparkplug = false;
 | 
			
		||||
                if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration &&
 | 
			
		||||
                        ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug()) {
 | 
			
		||||
                    isSparkplug = true;
 | 
			
		||||
                }
 | 
			
		||||
                Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
 | 
			
		||||
                if (device == null) {
 | 
			
		||||
                    TenantId tenantId = gateway.getTenantId();
 | 
			
		||||
@ -298,6 +291,7 @@ public class DefaultTransportApiService implements TransportApiService {
 | 
			
		||||
                    device.setName(requestMsg.getDeviceName());
 | 
			
		||||
                    device.setType(requestMsg.getDeviceType());
 | 
			
		||||
                    device.setCustomerId(gateway.getCustomerId());
 | 
			
		||||
                    DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
 | 
			
		||||
 | 
			
		||||
                    device.setDeviceProfileId(deviceProfile.getId());
 | 
			
		||||
                    ObjectNode additionalInfo = JacksonUtil.newObjectNode();
 | 
			
		||||
@ -314,7 +308,7 @@ public class DefaultTransportApiService implements TransportApiService {
 | 
			
		||||
                    if (customerId != null && !customerId.isNullUid()) {
 | 
			
		||||
                        metaData.putValue("customerId", customerId.toString());
 | 
			
		||||
                    }
 | 
			
		||||
                    String deviceIdStr = isSparkplug ? "sparkplugId" : "gatewayId";
 | 
			
		||||
                    String deviceIdStr = requestMsg.getSparkplug() ? "sparkplugId" : "gatewayId";
 | 
			
		||||
                    metaData.putValue(deviceIdStr, gatewayId.toString());
 | 
			
		||||
 | 
			
		||||
                    DeviceId deviceId = device.getId();
 | 
			
		||||
@ -326,7 +320,7 @@ public class DefaultTransportApiService implements TransportApiService {
 | 
			
		||||
                    if (deviceAdditionalInfo == null) {
 | 
			
		||||
                        deviceAdditionalInfo = JacksonUtil.newObjectNode();
 | 
			
		||||
                    }
 | 
			
		||||
                    String lastConnectedStr = isSparkplug ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY;
 | 
			
		||||
                    String lastConnectedStr = requestMsg.getSparkplug() ? DataConstants.LAST_CONNECTED_SPARKPLUG : DataConstants.LAST_CONNECTED_GATEWAY;
 | 
			
		||||
                    if (deviceAdditionalInfo.isObject() &&
 | 
			
		||||
                            (!deviceAdditionalInfo.has(lastConnectedStr)
 | 
			
		||||
                                    || !gatewayId.toString().equals(deviceAdditionalInfo.get(lastConnectedStr).asText()))) {
 | 
			
		||||
@ -338,7 +332,12 @@ public class DefaultTransportApiService implements TransportApiService {
 | 
			
		||||
                }
 | 
			
		||||
                GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
 | 
			
		||||
                        .setDeviceInfo(getDeviceInfoProto(device));
 | 
			
		||||
                builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
 | 
			
		||||
                DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
 | 
			
		||||
                if (deviceProfile != null) {
 | 
			
		||||
                    builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
 | 
			
		||||
                }
 | 
			
		||||
                return TransportApiResponseMsg.newBuilder()
 | 
			
		||||
                        .setGetOrCreateDeviceResponseMsg(builder.build())
 | 
			
		||||
                        .build();
 | 
			
		||||
 | 
			
		||||
@ -186,6 +186,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg {
 | 
			
		||||
  int64 gatewayIdLSB = 2;
 | 
			
		||||
  string deviceName = 3;
 | 
			
		||||
  string deviceType = 4;
 | 
			
		||||
  bool sparkplug = 5;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message GetOrCreateDeviceFromGatewayResponseMsg {
 | 
			
		||||
 | 
			
		||||
@ -132,7 +132,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
    final DeviceSessionCtx deviceSessionCtx;
 | 
			
		||||
    volatile InetSocketAddress address;
 | 
			
		||||
    volatile GatewaySessionHandler gatewaySessionHandler;
 | 
			
		||||
    volatile SparkplugNodeSessionHandler sparkPlugSessionHandler;
 | 
			
		||||
    volatile SparkplugNodeSessionHandler sparkplugSessionHandler;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentHashMap<String, String> otaPackSessions;
 | 
			
		||||
    private final ConcurrentHashMap<String, Integer> chunkSizes;
 | 
			
		||||
@ -325,14 +325,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        int msgId = mqttMsg.variableHeader().packetId();
 | 
			
		||||
        log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
 | 
			
		||||
 | 
			
		||||
        if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
 | 
			
		||||
        if (sparkplugSessionHandler != null) {
 | 
			
		||||
            handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg);
 | 
			
		||||
            transportService.reportActivity(deviceSessionCtx.getSessionInfo());
 | 
			
		||||
        } else if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
 | 
			
		||||
            if (gatewaySessionHandler != null) {
 | 
			
		||||
                handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
 | 
			
		||||
                transportService.reportActivity(deviceSessionCtx.getSessionInfo());
 | 
			
		||||
            }
 | 
			
		||||
        } else if (sparkPlugSessionHandler != null) {
 | 
			
		||||
            handleSparkplugPublishMsg(ctx, topicName, msgId, mqttMsg);
 | 
			
		||||
            transportService.reportActivity(deviceSessionCtx.getSessionInfo());
 | 
			
		||||
        } else {
 | 
			
		||||
            processDevicePublish(ctx, mqttMsg, topicName, msgId);
 | 
			
		||||
        }
 | 
			
		||||
@ -376,7 +376,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
 | 
			
		||||
    private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) {
 | 
			
		||||
        try {
 | 
			
		||||
            sparkPlugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg);
 | 
			
		||||
            sparkplugSessionHandler.onPublishMsg(ctx, topicName, msgId, mqttMsg);
 | 
			
		||||
        } catch (RuntimeException e) {
 | 
			
		||||
            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
 | 
			
		||||
            ctx.close();
 | 
			
		||||
@ -643,9 +643,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
            String topic = subscription.topicName();
 | 
			
		||||
            MqttQoS reqQoS = subscription.qualityOfService();
 | 
			
		||||
            try {
 | 
			
		||||
                if (sparkPlugSessionHandler != null) {
 | 
			
		||||
                if (sparkplugSessionHandler != null) {
 | 
			
		||||
                    SparkplugTopic sparkplugTopic = parseTopic(mqttMsg.payload().topicSubscriptions().get(0).topicName());
 | 
			
		||||
                    sparkPlugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS);
 | 
			
		||||
                    sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS);
 | 
			
		||||
                } else {
 | 
			
		||||
                    switch (topic) {
 | 
			
		||||
                        case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
 | 
			
		||||
@ -978,14 +978,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkSparkPlugSession(MqttConnectMessage connectMessage) {
 | 
			
		||||
    private void checkSparkplugSession(MqttConnectMessage connectMessage) {
 | 
			
		||||
        try {
 | 
			
		||||
            SparkplugTopic sparkplugTopic = parseTopic(connectMessage.payload().willTopic());
 | 
			
		||||
            // Test proto
 | 
			
		||||
            SparkplugBProto.Payload payloadBProto = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
 | 
			
		||||
            //
 | 
			
		||||
            if (sparkPlugSessionHandler == null) {
 | 
			
		||||
                sparkPlugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString());
 | 
			
		||||
            if (sparkplugSessionHandler == null) {
 | 
			
		||||
                sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopic.toString());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.warn("SparkPlugNodeReConnected [{}] [{}]", sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
 | 
			
		||||
            }
 | 
			
		||||
@ -1029,7 +1029,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
                public void onSuccess(Void msg) {
 | 
			
		||||
                    SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
 | 
			
		||||
                    if (deviceSessionCtx.isSparkplug()) {
 | 
			
		||||
                        checkSparkPlugSession(connectMessage);
 | 
			
		||||
                        checkSparkplugSession(connectMessage);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        checkGatewaySession(sessionMetaData);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
@ -360,6 +360,7 @@ public class SparkplugNodeSessionHandler {
 | 
			
		||||
                            .setDeviceType(deviceType)
 | 
			
		||||
                            .setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
 | 
			
		||||
                            .setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
 | 
			
		||||
                            .setSparkplug(true)
 | 
			
		||||
                            .build(),
 | 
			
		||||
                    new TransportServiceCallback<>() {
 | 
			
		||||
                        @Override
 | 
			
		||||
 | 
			
		||||
@ -87,11 +87,10 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
 | 
			
		||||
    public boolean isSparkplug() {
 | 
			
		||||
        DeviceProfileTransportConfiguration transportConfiguration = this.deviceProfile.getProfileData().getTransportConfiguration();
 | 
			
		||||
        if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) {
 | 
			
		||||
            if (((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug()) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
            return ((MqttDeviceProfileTransportConfiguration) transportConfiguration).isSparkPlug();
 | 
			
		||||
        } else {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user