Merge pull request #180 from thingsboard/feature/TB-66

TB-66: Add support of device type mapping in the IoT Gateway
This commit is contained in:
Andrew Shvayka 2017-06-26 09:36:30 +03:00 committed by GitHub
commit 1a6826c07c
5 changed files with 42 additions and 15 deletions

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.EncryptionUtil;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
@ -67,14 +68,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, SslHandler sslHandler) {
this.processor = processor;
this.deviceService = deviceService;
this.relationService = relationService;
this.authService = authService;
this.adaptor = adaptor;
this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
@ -371,7 +374,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) {
gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
}
}
}

View File

@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.net.ssl.SSLException;
@ -45,14 +46,17 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor,
MqttSslHandlerProvider sslHandlerProvider) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
}
@ -68,7 +72,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
}

View File

@ -29,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.annotation.PostConstruct;
@ -56,6 +57,9 @@ public class MqttTransportService {
@Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
private RelationService relationService;
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@ -95,7 +99,7 @@ public class MqttTransportService {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");

View File

@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
@ -58,28 +60,34 @@ public class GatewaySessionCtx {
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final Map<String, GatewayDeviceSessionCtx> devices;
private ChannelHandlerContext channel;
public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
this.gateway = gatewaySessionCtx.getDevice();
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
this.devices = new HashMap<>();
}
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
String deviceName = checkDeviceName(getDeviceName(msg));
JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json));
String deviceType = getDeviceType(json);
if (!devices.containsKey(deviceName)) {
Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
Device device = deviceOpt.orElseGet(() -> {
Device newDevice = new Device();
newDevice.setTenantId(gateway.getTenantId());
newDevice.setName(deviceName);
newDevice.setType("default");
return deviceService.saveDevice(newDevice);
newDevice.setType(deviceType);
newDevice = deviceService.saveDevice(newDevice);
relationService.saveRelation(new EntityRelation(gateway.getId(), newDevice.getId(), "Created"));
return newDevice;
});
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
devices.put(deviceName, ctx);
@ -91,7 +99,7 @@ public class GatewaySessionCtx {
}
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
String deviceName = checkDeviceName(getDeviceName(msg));
String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
@ -211,11 +219,19 @@ public class GatewaySessionCtx {
}
}
private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
private String getDeviceName(JsonElement json) throws AdaptorException {
return json.getAsJsonObject().get("device").getAsString();
}
private String getDeviceType(JsonElement json) throws AdaptorException {
JsonElement type = json.getAsJsonObject().get("type");
return type == null ? "default" : type.getAsString();
}
private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
}
protected SessionMsgProcessor getProcessor() {
return processor;
}
@ -229,7 +245,9 @@ public class GatewaySessionCtx {
}
private void ack(MqttPublishMessage msg) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
if(msg.variableHeader().messageId() > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
}
}
protected void writeAndFlush(MqttMessage mqttMessage) {

View File

@ -18,9 +18,7 @@ package org.thingsboard.server.transport.mqtt.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Base64Utils;
import org.thingsboard.server.dao.EncryptionUtil;
import sun.misc.BASE64Encoder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;