diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 36868c5906..9c4dd40417 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.EncryptionUtil; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; @@ -67,14 +68,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final SessionMsgProcessor processor; private final DeviceService deviceService; private final DeviceAuthService authService; + private final RelationService relationService; private final SslHandler sslHandler; private volatile boolean connected; private volatile GatewaySessionCtx gatewaySessionCtx; - public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, + public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, MqttTransportAdaptor adaptor, SslHandler sslHandler) { this.processor = processor; this.deviceService = deviceService; + this.relationService = relationService; this.authService = authService; this.adaptor = adaptor; this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); @@ -371,7 +374,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx); + gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx); } } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index af109f652b..1469290837 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import javax.net.ssl.SSLException; @@ -45,14 +46,17 @@ public class MqttTransportServerInitializer extends ChannelInitializer devices; private ChannelHandlerContext channel; - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) { + public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { this.processor = processor; this.deviceService = deviceService; this.authService = authService; + this.relationService = relationService; this.gateway = gatewaySessionCtx.getDevice(); this.gatewaySessionId = gatewaySessionCtx.getSessionId(); this.devices = new HashMap<>(); } public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { - String deviceName = checkDeviceName(getDeviceName(msg)); + JsonElement json = getJson(msg); + String deviceName = checkDeviceName(getDeviceName(json)); + String deviceType = getDeviceType(json); if (!devices.containsKey(deviceName)) { Optional deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName); Device device = deviceOpt.orElseGet(() -> { Device newDevice = new Device(); newDevice.setTenantId(gateway.getTenantId()); newDevice.setName(deviceName); - newDevice.setType("default"); - return deviceService.saveDevice(newDevice); + newDevice.setType(deviceType); + newDevice = deviceService.saveDevice(newDevice); + relationService.saveRelation(new EntityRelation(gateway.getId(), newDevice.getId(), "Created")); + return newDevice; }); GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); devices.put(deviceName, ctx); @@ -91,7 +99,7 @@ public class GatewaySessionCtx { } public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { - String deviceName = checkDeviceName(getDeviceName(msg)); + String deviceName = checkDeviceName(getDeviceName(getJson(msg))); GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); if (deviceSessionCtx != null) { processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); @@ -211,11 +219,19 @@ public class GatewaySessionCtx { } } - private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException { - JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + private String getDeviceName(JsonElement json) throws AdaptorException { return json.getAsJsonObject().get("device").getAsString(); } + private String getDeviceType(JsonElement json) throws AdaptorException { + JsonElement type = json.getAsJsonObject().get("type"); + return type == null ? "default" : type.getAsString(); + } + + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { + return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + } + protected SessionMsgProcessor getProcessor() { return processor; } @@ -229,7 +245,9 @@ public class GatewaySessionCtx { } private void ack(MqttPublishMessage msg) { - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); + if(msg.variableHeader().messageId() > 0) { + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); + } } protected void writeAndFlush(MqttMessage mqttMessage) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java index adda3440c5..590c289669 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java @@ -18,9 +18,7 @@ package org.thingsboard.server.transport.mqtt.util; import lombok.extern.slf4j.Slf4j; import org.springframework.util.Base64Utils; import org.thingsboard.server.dao.EncryptionUtil; -import sun.misc.BASE64Encoder; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.security.cert.CertificateEncodingException; import java.security.cert.X509Certificate;