sparkplug: add onAttributesUpdate

This commit is contained in:
nickAS21 2023-01-27 18:53:35 +02:00
parent 1d49e56aec
commit a01c7a23d7
5 changed files with 43 additions and 10 deletions

View File

@ -1062,21 +1062,24 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void checkSparkplugSession(MqttConnectMessage connectMessage) { private void checkSparkplugSession(MqttConnectMessage connectMessage) {
try { try {
if (sparkplugSessionHandler == null) { if (sparkplugSessionHandler == null && validatedSparkplugTopic(connectMessage)) {
sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId);
if (StringUtils.isNotBlank(connectMessage.payload().willTopic())
&& connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
SparkplugTopic sparkplugTopic = parseTopicPublish(connectMessage.payload().willTopic()); SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopicNode);
sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode,
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic); deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode);
}
} }
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
} }
} }
private boolean validatedSparkplugTopic (MqttConnectMessage connectMessage) {
return StringUtils.isNotBlank(connectMessage.payload().willTopic())
&& connectMessage.payload().willMessageInBytes() != null
&& connectMessage.payload().willMessageInBytes().length > 0;
}
@Override @Override
public void operationComplete(Future<? super Void> future) throws Exception { public void operationComplete(Future<? super Void> future) throws Exception {
log.trace("[{}] Channel closed!", sessionId); log.trace("[{}] Channel closed!", sessionId);

View File

@ -37,7 +37,7 @@ import java.util.concurrent.ConcurrentMap;
@Slf4j @Slf4j
public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener {
private final AbstractGatewaySessionHandler parent; protected final AbstractGatewaySessionHandler parent;
private final TransportService transportService; private final TransportService transportService;
public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo, public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo,

View File

@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler {
new TransportServiceCallback<>() { new TransportServiceCallback<>() {
@Override @Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
GatewayDeviceSessionContext deviceSessionCtx = new GatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ;
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
@ -282,6 +282,11 @@ public abstract class AbstractGatewaySessionHandler {
} }
} }
private AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) {
return this.deviceSessionCtx.isSparkplug() ? new SparkplugSessionCtx(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) :
new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
protected int getMsgId(MqttPublishMessage mqttMsg) { protected int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId(); return mqttMsg.variableHeader().packetId();
} }

View File

@ -52,8 +52,11 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi
@Slf4j @Slf4j
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { final SparkplugTopic sparkplugTopicNode;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, SparkplugTopic sparkplugTopicNode) {
super(deviceSessionCtx, sessionId); super(deviceSessionCtx, sessionId);
this.sparkplugTopicNode = sparkplugTopicNode;
} }

View File

@ -15,11 +15,15 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
@ -36,4 +40,22 @@ public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext {
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
} }
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush);
}
private Optional<MqttPublishMessage> createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notification) {
try {
// TODO metrics from notification & MetricsDBIRTH
byte[] payloadInBytes = new byte[3];
String topic = ((SparkplugNodeSessionHandler) parent).sparkplugTopicNode.toString() + "/" + deviceInfo.getDeviceName();
return Optional.of(parent.getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes));
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e);
return Optional.empty();
}
}
} }