mqtt gateway refactoring due to OOM

This commit is contained in:
YevhenBondarenko 2024-05-16 12:45:17 +02:00
parent c9b1d1ec89
commit 5c6928558b
2 changed files with 265 additions and 245 deletions

View File

@ -33,6 +33,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import jakarta.annotation.Nullable;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -62,7 +63,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
import jakarta.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -99,6 +99,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected final MqttTransportContext context; protected final MqttTransportContext context;
protected final TransportService transportService; protected final TransportService transportService;
protected final TransportDeviceInfo gateway; protected final TransportDeviceInfo gateway;
@Getter
protected final UUID sessionId; protected final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap; private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, T> devices; private final ConcurrentMap<String, T> devices;
@ -193,10 +194,6 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
return context.getNodeId(); return context.getNodeId();
} }
public UUID getSessionId() {
return sessionId;
}
public MqttTransportAdaptor getPayloadAdaptor() { public MqttTransportAdaptor getPayloadAdaptor() {
return deviceSessionCtx.getPayloadAdaptor(); return deviceSessionCtx.getPayloadAdaptor();
} }
@ -368,24 +365,26 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException { protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (json.isJsonObject()) { if (!json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject(); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) { }
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonArray()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
}
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
T deviceCtx = devices.get(deviceName);
if (deviceCtx != null) {
processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() { new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable T deviceCtx) { public void onSuccess(@Nullable T deviceCtx) {
if (!deviceEntry.getValue().isJsonArray()) { processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray());
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
channel.close();
}
} }
@Override @Override
@ -394,8 +393,16 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
}, context.getExecutor()); }, context.getExecutor());
} }
} else { }
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); }
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) {
try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray());
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
channel.close();
} }
} }
@ -403,21 +410,21 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
try { try {
TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload));
List<TransportApiProtos.TelemetryMsg> deviceMsgList = telemetryMsgProto.getMsgList(); List<TransportApiProtos.TelemetryMsg> deviceMsgList = telemetryMsgProto.getMsgList();
if (!CollectionUtils.isEmpty(deviceMsgList)) { if (CollectionUtils.isEmpty(deviceMsgList)) {
log.debug("[{}][{}][{}] Devices telemetry messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
}
deviceMsgList.forEach(telemetryMsg -> { deviceMsgList.forEach(telemetryMsg -> {
String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName), T deviceCtx = devices.get(deviceName);
new FutureCallback<>() { if (deviceCtx != null) {
processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable T deviceCtx) { public void onSuccess(@Nullable T deviceCtx) {
TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg(); processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId);
try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
channel.close();
}
} }
@Override @Override
@ -425,18 +432,21 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
});
} else {
log.debug("[{}][{}][{}] Devices telemetry messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
} }
});
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
public void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) {
try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
channel.close();
}
} }
public TransportProtos.PostTelemetryMsg postTelemetryMsgCreated(TransportProtos.KeyValueProto keyValueProto, long ts) { public TransportProtos.PostTelemetryMsg postTelemetryMsgCreated(TransportProtos.KeyValueProto keyValueProto, long ts) {
@ -452,24 +462,26 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (json.isJsonObject()) { if (!json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
} }
try { JsonObject jsonObj = json.getAsJsonObject();
DeviceId deviceId = deviceCtx.getDeviceId(); for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue()); if (!deviceEntry.getValue().isJsonObject()) {
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId); log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
} catch (Throwable e) { continue;
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
} }
String deviceName = deviceEntry.getKey();
T deviceCtx = devices.get(deviceEntry.getKey());
if (deviceCtx != null) {
processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} }
@Override @Override
@ -478,8 +490,16 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
}, context.getExecutor()); }, context.getExecutor());
} }
} else { }
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); }
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) {
try {
DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest);
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
} }
} }
@ -487,24 +507,21 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
try { try {
TransportApiProtos.GatewayClaimMsg claimMsgProto = TransportApiProtos.GatewayClaimMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayClaimMsg claimMsgProto = TransportApiProtos.GatewayClaimMsg.parseFrom(getBytes(payload));
List<TransportApiProtos.ClaimDeviceMsg> claimMsgList = claimMsgProto.getMsgList(); List<TransportApiProtos.ClaimDeviceMsg> claimMsgList = claimMsgProto.getMsgList();
if (!CollectionUtils.isEmpty(claimMsgList)) { if (CollectionUtils.isEmpty(claimMsgList)) {
log.debug("[{}][{}][{}] Devices claim messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
}
claimMsgList.forEach(claimDeviceMsg -> { claimMsgList.forEach(claimDeviceMsg -> {
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName), T deviceCtx = devices.get(deviceName);
new FutureCallback<>() { if (deviceCtx != null) {
processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable T deviceCtx) { public void onSuccess(@Nullable T deviceCtx) {
TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest(); processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId);
if (claimRequest == null) {
throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!");
}
try {
DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
}
} }
@Override @Override
@ -512,35 +529,39 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
});
} else {
log.debug("[{}][{}][{}] Devices claim messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
} }
});
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) {
try {
DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
}
} }
private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (json.isJsonObject()) { if (!json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
} }
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject()); JsonObject jsonObj = json.getAsJsonObject();
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
if (!deviceEntry.getValue().isJsonObject()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
}
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} }
@Override @Override
@ -549,32 +570,27 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
}, context.getExecutor()); }, context.getExecutor());
} }
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
} }
private void onDeviceAttributesProto(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceAttributesProto(int msgId, ByteBuf payload) throws AdaptorException {
try { try {
TransportApiProtos.GatewayAttributesMsg attributesMsgProto = TransportApiProtos.GatewayAttributesMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayAttributesMsg attributesMsgProto = TransportApiProtos.GatewayAttributesMsg.parseFrom(getBytes(payload));
List<TransportApiProtos.AttributesMsg> attributesMsgList = attributesMsgProto.getMsgList(); List<TransportApiProtos.AttributesMsg> attributesMsgList = attributesMsgProto.getMsgList();
if (!CollectionUtils.isEmpty(attributesMsgList)) { if (CollectionUtils.isEmpty(attributesMsgList)) {
log.debug("[{}][{}][{}] Devices attributes keys list is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
}
attributesMsgList.forEach(attributesMsg -> { attributesMsgList.forEach(attributesMsg -> {
String deviceName = checkDeviceName(attributesMsg.getDeviceName()); String deviceName = checkDeviceName(attributesMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName), T deviceCtx = devices.get(deviceName);
new FutureCallback<>() { if (deviceCtx != null) {
processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable T deviceCtx) { public void onSuccess(@Nullable T deviceCtx) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId);
if (kvListProto == null) {
throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!");
}
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
}
} }
@Override @Override
@ -582,18 +598,29 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
});
} else {
log.debug("[{}][{}][{}] Devices attributes keys list is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
} }
});
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) {
try {
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject());
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
}
}
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) {
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
}
} }
private void onDeviceAttributesRequestJson(MqttPublishMessage msg) throws AdaptorException { private void onDeviceAttributesRequestJson(MqttPublishMessage msg) throws AdaptorException {
@ -637,43 +664,37 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceRpcResponseJson(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceRpcResponseJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (json.isJsonObject()) { if (!json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(data).build();
processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
} }
JsonObject jsonObj = json.getAsJsonObject();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
onDeviceRpcResponse(requestId, data, deviceName, msgId);
} }
private void onDeviceRpcResponseProto(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceRpcResponseProto(int msgId, ByteBuf payload) throws AdaptorException {
try { try {
TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload));
String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName()); String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName());
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
Integer requestId = gatewayRpcResponseMsg.getId(); Integer requestId = gatewayRpcResponseMsg.getId();
String data = gatewayRpcResponseMsg.getData(); String data = gatewayRpcResponseMsg.getData();
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() onDeviceRpcResponse(requestId, data, deviceName, msgId);
.setRequestId(requestId).setPayload(data).build(); } catch (RuntimeException | InvalidProtocolBufferException e) {
processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId); throw new AdaptorException(e);
}
}
private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) {
T deviceCtx = devices.get(deviceName);
if (deviceCtx != null) {
processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId);
} }
@Override @Override
@ -681,22 +702,25 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
} }
} }
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(data).build();
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg));
} }
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
int msgId = getMsgId(mqttMsg); int msgId = getMsgId(mqttMsg);
Futures.addCallback(checkDeviceConnected(deviceName), T deviceCtx = devices.get(deviceName);
new FutureCallback<>() { if (deviceCtx != null) {
processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable T deviceCtx) { public void onSuccess(@Nullable T deviceCtx) {
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId);
} }
@Override @Override
@ -706,6 +730,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
}, context.getExecutor()); }, context.getExecutor());
} }
}
private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg));
}
private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) { private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) {
TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder(); TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();

View File

@ -26,6 +26,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.ResponseCode;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -44,6 +45,7 @@ import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -68,6 +70,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi
@Slf4j @Slf4j
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<SparkplugDeviceSessionContext> { public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<SparkplugDeviceSessionContext> {
@Getter
private final SparkplugTopic sparkplugTopicNode; private final SparkplugTopic sparkplugTopicNode;
private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics; private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics;
private final MqttTransportHandler parent; private final MqttTransportHandler parent;
@ -136,18 +139,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) throws AdaptorException { int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) throws AdaptorException {
try { Futures.addCallback(contextListenableFuture, new FutureCallback<>() {
int finalMsgId = msgId;
postTelemetryMsgList.forEach(telemetryMsg -> {
Futures.addCallback(contextListenableFuture,
new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) {
try { try {
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId); processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
channel.close(); channel.close();
break;
}
} }
} }
@ -156,21 +158,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
});
} catch (RuntimeException e) {
throw new AdaptorException(e);
}
} }
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId, private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException { List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
try { try {
if (!CollectionUtils.isEmpty(attributesMsgList)) { if (!CollectionUtils.isEmpty(attributesMsgList)) {
attributesMsgList.forEach(attributesMsg -> {
Futures.addCallback(contextListenableFuture, Futures.addCallback(contextListenableFuture,
new FutureCallback<>() { new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try { try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
@ -179,13 +177,13 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
} }
} }
}
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
} }
}, context.getExecutor()); }, context.getExecutor());
});
} else { } else {
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
} }
@ -195,9 +193,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
} }
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, MqttTopicSubscription subscription, public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, MqttTopicSubscription subscription,
MqttQoS reqQoS) throws ThingsboardException, AdaptorException, MqttQoS reqQoS) throws ThingsboardException {
ExecutionException, InterruptedException { SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicFilter());
SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicName());
if (sparkplugTopic.getGroupId() == null) { if (sparkplugTopic.getGroupId() == null) {
// TODO SUBSCRIBE NameSpace // TODO SUBSCRIBE NameSpace
} else if (sparkplugTopic.getType() == null) { } else if (sparkplugTopic.getType() == null) {
@ -304,10 +301,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
return Optional.empty(); return Optional.empty();
} }
public SparkplugTopic getSparkplugTopicNode() {
return this.sparkplugTopicNode;
}
public Optional<MqttPublishMessage> createSparkplugMqttPublishMsg(TransportProtos.TsKvProto tsKvProto, public Optional<MqttPublishMessage> createSparkplugMqttPublishMsg(TransportProtos.TsKvProto tsKvProto,
String sparkplugTopic, String sparkplugTopic,
SparkplugBProto.Payload.Metric metricBirth) { SparkplugBProto.Payload.Metric metricBirth) {
@ -328,7 +321,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
} catch (Exception e) { } catch (Exception e) {
log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes response to MQTT sparkplug msg", log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes response to MQTT sparkplug msg",
deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, e); deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, e);
return Optional.empty();
} }
return Optional.empty(); return Optional.empty();
} }
@ -350,5 +342,4 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg); parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg);
} }
} }