diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index b4644ac3ac..1eaf1f38e8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.transport.mqtt.session; - import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -32,15 +30,15 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttVersion; -import jakarta.annotation.Nullable; import lombok.Getter; import lombok.Setter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.ProtoConverter; @@ -78,6 +76,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType; import static org.thingsboard.server.common.data.DataConstants.DEFAULT_DEVICE_TYPE; @@ -224,18 +223,12 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T result) { - ack(msg, ReturnCode.SUCCESS); - log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); - } - - @Override - public void onFailure(Throwable t) { - logDeviceCreationError(t, deviceName); - } - }, context.getExecutor()); + process(onDeviceConnect(deviceName, deviceType), + result -> { + ack(msg, ReturnCode.SUCCESS); + log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); + }, + t -> logDeviceCreationError(t, deviceName)); } public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { @@ -374,25 +367,9 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); } } @@ -417,22 +394,8 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); - T deviceCtx = devices.get(deviceName); - if (deviceCtx != null) { - processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId); - } else { - Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -470,23 +433,8 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); } } @@ -512,22 +460,8 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); - T deviceCtx = devices.get(deviceName); - if (deviceCtx != null) { - processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId); - } else { - Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -553,18 +487,20 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId); - } - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); + String deviceName = deviceEntry.getKey(); + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); + } + } + + private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { + try { + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + } catch (Throwable e) { + log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); + ackOrClose(msgId); } } @@ -579,38 +515,14 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); - T deviceCtx = devices.get(deviceName); - if (deviceCtx != null) { - processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId); - } else { - Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { - try { - TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); - } catch (Throwable e) { - log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - ackOrClose(msgId); - } - } - protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); @@ -623,26 +535,23 @@ public abstract class AbstractGatewaySessionHandler keys; - if (jsonObj.has("key")) { - keys = Collections.singleton(jsonObj.get("key").getAsString()); - } else { - JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); - keys = new HashSet<>(); - for (JsonElement keyObj : keysArray) { - keys.add(keyObj.getAsString()); - } - } - TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys); - processGetAttributeRequestMessage(msg, deviceName, requestMsg); + validateJsonObject(json); + JsonObject jsonObj = json.getAsJsonObject(); + int requestId = jsonObj.get("id").getAsInt(); + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); + boolean clientScope = jsonObj.get("client").getAsBoolean(); + Set keys; + if (jsonObj.has("key")) { + keys = Collections.singleton(jsonObj.get("key").getAsString()); } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); + JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); + keys = new HashSet<>(); + for (JsonElement keyObj : keysArray) { + keys.add(keyObj.getAsString()); + } } + TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys); + processGetAttributeRequestMessage(msg, deviceName, requestMsg); } private void onDeviceAttributesRequestProto(MqttPublishMessage mqttMsg) throws AdaptorException { @@ -689,22 +598,8 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), + t -> log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t)); } private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { @@ -715,23 +610,11 @@ public abstract class AbstractGatewaySessionHandler() { - @Override - public void onSuccess(@Nullable T deviceCtx) { - processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId); - } - - @Override - public void onFailure(Throwable t) { + process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), + t -> { ack(msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); - } - }, context.getExecutor()); - } + }); } private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { @@ -750,16 +633,6 @@ public abstract class AbstractGatewaySessionHandler checkDeviceConnected(String deviceName) { - T ctx = devices.get(deviceName); - if (ctx == null) { - log.debug("[{}][{}][{}] Missing device [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); - return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); - } else { - return Futures.immediateFuture(ctx); - } - } - protected String checkDeviceName(String deviceName) { if (StringUtils.isEmpty(deviceName)) { throw new RuntimeException("Device name is empty!"); @@ -796,7 +669,7 @@ public abstract class AbstractGatewaySessionHandler onSuccess, Consumer onFailure) { + ListenableFuture deviceCtxFuture = onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); + process(deviceCtxFuture, onSuccess, onFailure); + } + + @SneakyThrows + protected void process(ListenableFuture deviceCtxFuture, Consumer onSuccess, Consumer onFailure) { + if (deviceCtxFuture.isDone()) { + onSuccess.accept(deviceCtxFuture.get()); + } else { + DonAsynchron.withCallback(deviceCtxFuture, onSuccess, onFailure, context.getExecutor()); + } + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index cbc515c1d0..e06d954678 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -30,12 +29,12 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.springframework.util.CollectionUtils; -import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; -import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.ProtoConverter; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; @@ -44,8 +43,6 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; -import jakarta.annotation.Nullable; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -53,7 +50,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; @@ -72,6 +68,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler nodeBirthMetrics; private final MqttTransportHandler parent; @@ -88,10 +85,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler metric))); } - public Map getNodeBirthMetrics() { - return this.nodeBirthMetrics; - } - public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; byte[] bytes = getBytes(inbound.payload()); @@ -138,55 +131,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler contextListenableFuture, - int msgId, List postTelemetryMsgList, String deviceName) throws AdaptorException { - Futures.addCallback(contextListenableFuture, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { - for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { - try { - processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); - channel.close(); - break; + int msgId, List postTelemetryMsgList, String deviceName) { + process(contextListenableFuture, deviceCtx -> { + for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { + try { + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); + } catch (Throwable e) { + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); + ackOrClose(msgId); + } } - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); - } - }, context.getExecutor()); + }, + t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t)); } private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, List attributesMsgList, String deviceName) throws AdaptorException { try { - if (!CollectionUtils.isEmpty(attributesMsgList)) { - Futures.addCallback(contextListenableFuture, - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { - for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { - TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); - try { - TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); - } catch (Throwable e) { - log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); - } - } - } - - @Override - public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); - } - }, context.getExecutor()); - } else { + if (CollectionUtils.isEmpty(attributesMsgList)) { log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); } + process(contextListenableFuture, deviceCtx -> { + for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { + TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); + try { + TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); + } catch (Throwable e) { + log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); + } + } + }, + t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t)); } catch (RuntimeException e) { throw new AdaptorException(e); } @@ -235,9 +211,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); - if (keyValueProtoOpt.isPresent()) { - msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts)); - } + keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts))); } }