Used DonAsynchron instead of addCallback

This commit is contained in:
YevhenBondarenko 2024-05-16 20:16:04 +02:00
parent b96a7fcbd2
commit 0fe3be1537
2 changed files with 96 additions and 235 deletions

View File

@ -15,9 +15,7 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
@ -32,15 +30,15 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.codec.mqtt.MqttVersion;
import jakarta.annotation.Nullable;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.adaptor.ProtoConverter;
@ -78,6 +76,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType; import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType;
import static org.thingsboard.server.common.data.DataConstants.DEFAULT_DEVICE_TYPE; import static org.thingsboard.server.common.data.DataConstants.DEFAULT_DEVICE_TYPE;
@ -224,18 +223,12 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { protected void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
log.trace("[{}][{}][{}] onDeviceConnect: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); log.trace("[{}][{}][{}] onDeviceConnect: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() { process(onDeviceConnect(deviceName, deviceType),
@Override result -> {
public void onSuccess(@Nullable T result) {
ack(msg, ReturnCode.SUCCESS); ack(msg, ReturnCode.SUCCESS);
log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
} },
t -> logDeviceCreationError(t, deviceName));
@Override
public void onFailure(Throwable t) {
logDeviceCreationError(t, deviceName);
}
}, context.getExecutor());
} }
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
@ -374,25 +367,9 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue; continue;
} }
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
if (deviceCtx != null) { t -> log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
} }
} }
@ -417,22 +394,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
deviceMsgList.forEach(telemetryMsg -> { deviceMsgList.forEach(telemetryMsg -> {
String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId),
if (deviceCtx != null) { t -> log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
@ -470,23 +433,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
T deviceCtx = devices.get(deviceEntry.getKey()); process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
t -> log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
if (deviceCtx != null) {
processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
} }
} }
@ -512,22 +460,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
claimMsgList.forEach(claimDeviceMsg -> { claimMsgList.forEach(claimDeviceMsg -> {
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId),
if (deviceCtx != null) { t -> log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
@ -553,18 +487,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue; continue;
} }
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
@Override t -> log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
public void onSuccess(@Nullable T deviceCtx) { }
processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId);
} }
@Override private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) {
public void onFailure(Throwable t) { try {
log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject());
} transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
}, context.getExecutor()); } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId);
} }
} }
@ -579,38 +515,14 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
attributesMsgList.forEach(attributesMsg -> { attributesMsgList.forEach(attributesMsg -> {
String deviceName = checkDeviceName(attributesMsg.getDeviceName()); String deviceName = checkDeviceName(attributesMsg.getDeviceName());
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId),
if (deviceCtx != null) { t -> log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) {
try {
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject());
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId);
}
}
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) {
try { try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
@ -623,7 +535,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceAttributesRequestJson(MqttPublishMessage msg) throws AdaptorException { private void onDeviceAttributesRequestJson(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, msg.payload()); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, msg.payload());
if (json.isJsonObject()) { validateJsonObject(json);
JsonObject jsonObj = json.getAsJsonObject(); JsonObject jsonObj = json.getAsJsonObject();
int requestId = jsonObj.get("id").getAsInt(); int requestId = jsonObj.get("id").getAsInt();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
@ -640,9 +552,6 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys); TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys);
processGetAttributeRequestMessage(msg, deviceName, requestMsg); processGetAttributeRequestMessage(msg, deviceName, requestMsg);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
} }
private void onDeviceAttributesRequestProto(MqttPublishMessage mqttMsg) throws AdaptorException { private void onDeviceAttributesRequestProto(MqttPublishMessage mqttMsg) throws AdaptorException {
@ -689,22 +598,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) { private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) {
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId),
if (deviceCtx != null) { t -> log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t));
processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
}
} }
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) {
@ -715,23 +610,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
int msgId = getMsgId(mqttMsg); int msgId = getMsgId(mqttMsg);
T deviceCtx = devices.get(deviceName); process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId),
if (deviceCtx != null) { t -> {
processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId);
} else {
Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T deviceCtx) {
processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId);
}
@Override
public void onFailure(Throwable t) {
ack(msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); ack(msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
} });
}, context.getExecutor());
}
} }
private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) {
@ -750,16 +633,6 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
return result.build(); return result.build();
} }
protected ListenableFuture<T> checkDeviceConnected(String deviceName) {
T ctx = devices.get(deviceName);
if (ctx == null) {
log.debug("[{}][{}][{}] Missing device [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
} else {
return Futures.immediateFuture(ctx);
}
}
protected String checkDeviceName(String deviceName) { protected String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) { if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!"); throw new RuntimeException("Device name is empty!");
@ -796,7 +669,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
private void ackOrClose(int msgId) { protected void ackOrClose(int msgId) {
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
ack(msgId, PAYLOAD_FORMAT_INVALID); ack(msgId, PAYLOAD_FORMAT_INVALID);
} else { } else {
@ -841,4 +714,18 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}; };
} }
protected void process(String deviceName, Consumer<T> onSuccess, Consumer<Throwable> onFailure) {
ListenableFuture<T> deviceCtxFuture = onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
process(deviceCtxFuture, onSuccess, onFailure);
}
@SneakyThrows
protected <T> void process(ListenableFuture<T> deviceCtxFuture, Consumer<T> onSuccess, Consumer<Throwable> onFailure) {
if (deviceCtxFuture.isDone()) {
onSuccess.accept(deviceCtxFuture.get());
} else {
DonAsynchron.withCallback(deviceCtxFuture, onSuccess, onFailure, context.getExecutor());
}
}
} }

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -30,12 +29,12 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.ResponseCode;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.adaptor.ProtoConverter;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
@ -44,8 +43,6 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import jakarta.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -53,7 +50,6 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
@ -72,6 +68,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
@Getter @Getter
private final SparkplugTopic sparkplugTopicNode; private final SparkplugTopic sparkplugTopicNode;
@Getter
private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics; private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics;
private final MqttTransportHandler parent; private final MqttTransportHandler parent;
@ -88,10 +85,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
.collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric))); .collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric)));
} }
public Map<String, SparkplugBProto.Payload.Metric> getNodeBirthMetrics() {
return this.nodeBirthMetrics;
}
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = getBytes(inbound.payload()); byte[] bytes = getBytes(inbound.payload());
@ -138,36 +131,27 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
} }
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) throws AdaptorException { int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) {
Futures.addCallback(contextListenableFuture, new FutureCallback<>() { process(contextListenableFuture, deviceCtx -> {
@Override
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) {
try { try {
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
channel.close(); ackOrClose(msgId);
break;
} }
} }
} },
t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t));
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
} }
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId, private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException { List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
try { try {
if (!CollectionUtils.isEmpty(attributesMsgList)) { if (CollectionUtils.isEmpty(attributesMsgList)) {
Futures.addCallback(contextListenableFuture, log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
new FutureCallback<>() { }
@Override process(contextListenableFuture, deviceCtx -> {
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try { try {
@ -177,16 +161,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
} }
} }
} },
t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t));
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
} else {
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
}
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
@ -235,9 +211,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
String key = "bdSeq".equals(protoMetric.getName()) ? String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
if (keyValueProtoOpt.isPresent()) { keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts)));
msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts));
}
} }
} }