TB-33: Implementation
This commit is contained in:
parent
b650bf67a4
commit
0da7724c09
@ -76,10 +76,10 @@ mqtt:
|
||||
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
|
||||
timeout: "${MQTT_TIMEOUT:10000}"
|
||||
# Uncomment the following lines to enable ssl for MQTT
|
||||
ssl:
|
||||
key_store: keystore/mqttserver.jks
|
||||
key_store_password: password
|
||||
key_store_type: JKS
|
||||
# ssl:
|
||||
# key_store: keystore/mqttserver.jks
|
||||
# key_store_password: password
|
||||
# key_store_type: JKS
|
||||
|
||||
# CoAP server parameters
|
||||
coap:
|
||||
|
||||
@ -27,8 +27,6 @@ public interface SessionContext extends SessionAwareMsg {
|
||||
|
||||
void onMsg(SessionCtrlMsg msg) throws SessionException;
|
||||
|
||||
void onError(SessionException e);
|
||||
|
||||
boolean isClosed();
|
||||
|
||||
long getTimeout();
|
||||
|
||||
@ -24,6 +24,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
|
||||
private final boolean revoked;
|
||||
private final boolean timeout;
|
||||
|
||||
public static SessionCloseMsg onDisconnect(SessionId sessionId) {
|
||||
return new SessionCloseMsg(sessionId, false, false);
|
||||
}
|
||||
|
||||
public static SessionCloseMsg onError(SessionId sessionId) {
|
||||
return new SessionCloseMsg(sessionId, false, false);
|
||||
}
|
||||
|
||||
@ -74,7 +74,7 @@ public class JsonConverter {
|
||||
}
|
||||
}
|
||||
|
||||
private static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
|
||||
public static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
|
||||
long ts = jo.get("ts").getAsLong();
|
||||
JsonObject valuesObject = jo.get("values").getAsJsonObject();
|
||||
for (KvEntry entry : parseValues(valuesObject)) {
|
||||
@ -82,7 +82,7 @@ public class JsonConverter {
|
||||
}
|
||||
}
|
||||
|
||||
private static List<KvEntry> parseValues(JsonObject valuesObject) {
|
||||
public static List<KvEntry> parseValues(JsonObject valuesObject) {
|
||||
List<KvEntry> result = new ArrayList<>();
|
||||
for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
|
||||
JsonElement element = valueEntry.getValue();
|
||||
|
||||
@ -95,17 +95,6 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(SessionException e) {
|
||||
if (e instanceof SessionAuthException) {
|
||||
log.warn("[{}] onError: {}", sessionId, e.getMessage());
|
||||
exchange.respond(ResponseCode.UNAUTHORIZED);
|
||||
} else {
|
||||
log.warn("[{}] onError: {}", sessionId, e.getMessage(), e);
|
||||
exchange.respond(ResponseCode.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SessionId getSessionId() {
|
||||
return sessionId;
|
||||
|
||||
@ -140,11 +140,6 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(SessionException e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return false;
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
package org.thingsboard.server.transport.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.gson.JsonElement;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.mqtt.*;
|
||||
@ -38,7 +37,6 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.dao.EncryptionUtil;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
|
||||
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
|
||||
@ -129,13 +127,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
|
||||
|
||||
if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
|
||||
AdaptorToSessionActorMsg msg = null;
|
||||
if (gatewaySessionCtx != null) {
|
||||
gatewaySessionCtx.setChannel(ctx);
|
||||
try {
|
||||
if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
|
||||
gatewaySessionCtx.connect(getDeviceName(mqttMsg));
|
||||
if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
|
||||
gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
|
||||
} else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
|
||||
gatewaySessionCtx.onDeviceAttributes(mqttMsg);
|
||||
} else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
|
||||
gatewaySessionCtx.onDeviceConnect(mqttMsg);
|
||||
} else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
|
||||
gatewaySessionCtx.disconnect(getDeviceName(mqttMsg));
|
||||
gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
|
||||
}
|
||||
} catch (RuntimeException | AdaptorException e) {
|
||||
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
|
||||
@ -146,11 +148,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
}
|
||||
}
|
||||
|
||||
private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||
JsonElement json = JsonMqttAdaptor.validateJsonPayload(deviceSessionCtx.getSessionId(), mqttMsg.payload());
|
||||
return json.getAsJsonObject().get("device").getAsString();
|
||||
}
|
||||
|
||||
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
|
||||
AdaptorToSessionActorMsg msg = null;
|
||||
try {
|
||||
@ -309,6 +306,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
|
||||
private void processDisconnect(ChannelHandlerContext ctx) {
|
||||
ctx.close();
|
||||
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
|
||||
if (gatewaySessionCtx != null) {
|
||||
gatewaySessionCtx.onGatewayDisconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
|
||||
@ -362,9 +363,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
|
||||
private void checkGatewaySession() {
|
||||
Device device = deviceSessionCtx.getDevice();
|
||||
JsonNode gatewayNode = device.getAdditionalInfo().get("gateway");
|
||||
if (gatewayNode != null && gatewayNode.asBoolean()) {
|
||||
gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, device);
|
||||
JsonNode infoNode = device.getAdditionalInfo();
|
||||
if (infoNode != null) {
|
||||
JsonNode gatewayNode = infoNode.get("gateway");
|
||||
if (gatewayNode != null && gatewayNode.asBoolean()) {
|
||||
gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -248,7 +248,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
|
||||
try {
|
||||
String payload = payloadData.toString(UTF8);
|
||||
if (payload == null) {
|
||||
log.warn("[{}] Payload is empty!", sessionId);
|
||||
log.warn("[{}] Payload is empty!", sessionId.toUidStr());
|
||||
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
|
||||
}
|
||||
return payload;
|
||||
|
||||
@ -1,49 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.adaptors;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
|
||||
import org.thingsboard.server.common.msg.session.MsgType;
|
||||
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
|
||||
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.17.
|
||||
*/
|
||||
public class JsonMqttGatewayAdaptor implements MqttGatewayAdaptor {
|
||||
|
||||
private static final Gson GSON = new Gson();
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
|
||||
|
||||
@Override
|
||||
public AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -82,11 +82,6 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(SessionException e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return false;
|
||||
|
||||
@ -15,26 +15,29 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.session;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.SessionId;
|
||||
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionType;
|
||||
import org.thingsboard.server.common.msg.core.ResponseMsg;
|
||||
import org.thingsboard.server.common.msg.session.*;
|
||||
import org.thingsboard.server.common.msg.session.ex.SessionException;
|
||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
|
||||
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.17.
|
||||
*/
|
||||
public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
|
||||
|
||||
private GatewaySessionCtx parent;
|
||||
private final MqttSessionId sessionId;
|
||||
private volatile boolean closed;
|
||||
|
||||
public GatewayDeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
|
||||
super(processor, authService, device);
|
||||
public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
|
||||
super(parent.getProcessor(), parent.getAuthService(), device);
|
||||
this.parent = parent;
|
||||
this.sessionId = new MqttSessionId();
|
||||
}
|
||||
|
||||
@ -49,8 +52,26 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException {
|
||||
public void onMsg(SessionActorToAdaptorMsg sessionMsg) throws SessionException {
|
||||
Optional<MqttMessage> message = getToDeviceMsg(sessionMsg);
|
||||
message.ifPresent(parent::writeAndFlush);
|
||||
}
|
||||
|
||||
private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {
|
||||
ToDeviceMsg msg = sessionMsg.getMsg();
|
||||
switch (msg.getMsgType()) {
|
||||
case STATUS_CODE_RESPONSE:
|
||||
ResponseMsg<?> responseMsg = (ResponseMsg) msg;
|
||||
if (responseMsg.isSuccess()) {
|
||||
MsgType requestMsgType = responseMsg.getRequestMsgType();
|
||||
Integer requestId = responseMsg.getRequestId();
|
||||
if (requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) {
|
||||
return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -58,11 +79,6 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(SessionException e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
|
||||
@ -15,55 +15,177 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.session;
|
||||
|
||||
import com.google.gson.*;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.SessionId;
|
||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
|
||||
import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest;
|
||||
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
|
||||
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
|
||||
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
|
||||
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
|
||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
|
||||
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.17.
|
||||
*/
|
||||
@Slf4j
|
||||
public class GatewaySessionCtx {
|
||||
|
||||
private static final Gson GSON = new Gson();
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
|
||||
|
||||
private final Device gateway;
|
||||
private final SessionId gatewaySessionId;
|
||||
private final SessionMsgProcessor processor;
|
||||
private final DeviceService deviceService;
|
||||
private final DeviceAuthService authService;
|
||||
private final Map<String, GatewayDeviceSessionCtx> devices;
|
||||
private ChannelHandlerContext channel;
|
||||
|
||||
public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, Device gateway) {
|
||||
public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
|
||||
this.processor = processor;
|
||||
this.deviceService = deviceService;
|
||||
this.authService = authService;
|
||||
this.gateway = gateway;
|
||||
this.gateway = gatewaySessionCtx.getDevice();
|
||||
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
|
||||
this.devices = new HashMap<>();
|
||||
}
|
||||
|
||||
public void connect(String deviceName) {
|
||||
checkDeviceName(deviceName);
|
||||
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
|
||||
String deviceName = checkDeviceName(getDeviceName(msg));
|
||||
Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
|
||||
Device device = deviceOpt.orElseGet(() -> {
|
||||
Device newDevice = new Device();
|
||||
newDevice.setTenantId(gateway.getTenantId());
|
||||
newDevice.setName(deviceName);
|
||||
return deviceService.saveDevice(newDevice);
|
||||
});
|
||||
devices.put(deviceName, new GatewayDeviceSessionCtx(processor, authService, device));
|
||||
devices.put(deviceName, new GatewayDeviceSessionCtx(this, device));
|
||||
ack(msg);
|
||||
}
|
||||
|
||||
public void disconnect(String deviceName) {
|
||||
checkDeviceName(deviceName);
|
||||
devices.remove(deviceName);
|
||||
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
|
||||
String deviceName = checkDeviceName(getDeviceName(msg));
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
|
||||
deviceSessionCtx.setClosed(true);
|
||||
ack(msg);
|
||||
}
|
||||
|
||||
private void checkDeviceName(String deviceName) {
|
||||
if (StringUtils.isEmpty(deviceName)) {
|
||||
throw new RuntimeException();
|
||||
public void onGatewayDisconnect() {
|
||||
devices.forEach((k, v) -> {
|
||||
processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
|
||||
});
|
||||
}
|
||||
|
||||
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
|
||||
int requestId = mqttMsg.variableHeader().messageId();
|
||||
if (json.isJsonObject()) {
|
||||
JsonObject jsonObj = json.getAsJsonObject();
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
|
||||
String deviceName = checkDeviceConnected(deviceEntry.getKey());
|
||||
if (!deviceEntry.getValue().isJsonArray()) {
|
||||
throw new JsonSyntaxException("Can't parse value: " + json);
|
||||
}
|
||||
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
|
||||
JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
|
||||
for (JsonElement element : deviceData) {
|
||||
JsonConverter.parseWithTs(request, element.getAsJsonObject());
|
||||
}
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
|
||||
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
|
||||
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
|
||||
}
|
||||
} else {
|
||||
throw new JsonSyntaxException("Can't parse value: " + json);
|
||||
}
|
||||
}
|
||||
|
||||
public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
|
||||
int requestId = mqttMsg.variableHeader().messageId();
|
||||
if (json.isJsonObject()) {
|
||||
JsonObject jsonObj = json.getAsJsonObject();
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
|
||||
String deviceName = checkDeviceConnected(deviceEntry.getKey());
|
||||
if (!deviceEntry.getValue().isJsonObject()) {
|
||||
throw new JsonSyntaxException("Can't parse value: " + json);
|
||||
}
|
||||
long ts = System.currentTimeMillis();
|
||||
BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId);
|
||||
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
|
||||
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
|
||||
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
|
||||
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
|
||||
}
|
||||
} else {
|
||||
throw new JsonSyntaxException("Can't parse value: " + json);
|
||||
}
|
||||
}
|
||||
|
||||
private String checkDeviceConnected(String deviceName) {
|
||||
if (!devices.containsKey(deviceName)) {
|
||||
throw new RuntimeException("Device is not connected!");
|
||||
} else {
|
||||
return deviceName;
|
||||
}
|
||||
}
|
||||
|
||||
private String checkDeviceName(String deviceName) {
|
||||
if (StringUtils.isEmpty(deviceName)) {
|
||||
throw new RuntimeException("Device name is empty!");
|
||||
} else {
|
||||
return deviceName;
|
||||
}
|
||||
}
|
||||
|
||||
private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||
JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
|
||||
return json.getAsJsonObject().get("device").getAsString();
|
||||
}
|
||||
|
||||
protected SessionMsgProcessor getProcessor() {
|
||||
return processor;
|
||||
}
|
||||
|
||||
protected DeviceAuthService getAuthService() {
|
||||
return authService;
|
||||
}
|
||||
|
||||
public void setChannel(ChannelHandlerContext channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
private void ack(MqttPublishMessage msg) {
|
||||
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
|
||||
}
|
||||
|
||||
protected void writeAndFlush(MqttMessage mqttMessage) {
|
||||
channel.writeAndFlush(mqttMessage);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user