sparkplug: comment2

This commit is contained in:
nickAS21 2023-02-16 18:48:04 +02:00
parent a80b3cb536
commit 67c0704a46
3 changed files with 71 additions and 51 deletions

View File

@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
/**
* Created by nickAS21 on 12.01.23
@ -81,9 +83,9 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), ONLINE.name()));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.STATE.name() + ", device: " + savedGateway.getName())
await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
@ -91,7 +93,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
});
for (Device device : devices) {
await(alias + SparkplugMessageType.STATE.name() + ", device: " + device.getName())
await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
@ -104,7 +106,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name()));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
SparkplugBProto.Payload.Builder payloadDeathDevice = SparkplugBProto.Payload.newBuilder()
@ -115,7 +117,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
Device device = devicesList.get(indexDeviceDisconnect);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(),
payloadDeathDevice.build().toByteArray(), 0, false);
await(alias + SparkplugMessageType.STATE.name() + ", device: " + device.getName())
await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
@ -128,13 +130,13 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name()));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
if (client.isConnected()) {
client.disconnect();
await(alias + SparkplugMessageType.STATE.name() + ", device: " + savedGateway.getName())
await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
@ -143,7 +145,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
List<Device> devicesList = new ArrayList<>(devices);
for (Device device : devicesList) {
await(alias + SparkplugMessageType.STATE.name() + ", device: " + device.getName())
await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));

View File

@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
@ -75,8 +76,9 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
/**
* Created by ashvayka on 19.01.17.
@ -246,45 +248,45 @@ public abstract class AbstractGatewaySessionHandler {
if (future != null) {
return future;
}
try {
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits())
.setSparkplug(this.deviceSessionCtx.isSparkplug())
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
}
futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName);
try {
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits())
.setSparkplug(this.deviceSessionCtx.isSparkplug())
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
}
futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName);
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
futureToSet.setException(e);
deviceFutures.remove(deviceName);
}
});
return futureToSet;
} catch (Throwable e) {
deviceFutures.remove(deviceName);
throw e;
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
futureToSet.setException(e);
deviceFutures.remove(deviceName);
}
});
return futureToSet;
} catch (Throwable e) {
deviceFutures.remove(deviceName);
throw e;
}
}
protected abstract AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg);
@ -727,7 +729,7 @@ public abstract class AbstractGatewaySessionHandler {
}
private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) {
if (this.deviceSessionCtx.isSparkplug()){
if (this.deviceSessionCtx.isSparkplug()) {
// add Msg Telemetry: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp()
sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(),
deviceSessionCtx.getDeviceInfo().getDeviceName(), OFFLINE, new Date().getTime());
@ -739,10 +741,15 @@ public abstract class AbstractGatewaySessionHandler {
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState typeSate, long ts) {
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
keyValueProtoBuilder.setKey(STATE.name());
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
keyValueProtoBuilder.setStringV(typeSate.name());
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
try {
keyValueProtoBuilder.setKey(messageName(STATE));
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
keyValueProtoBuilder.setStringV(typeSate.name());
} catch (ThingsboardException e) {
e.printStackTrace();
}
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg));
}

View File

@ -87,6 +87,17 @@ public enum SparkplugMessageType {
}
throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
public static String messageName(SparkplugMessageType type) throws ThingsboardException {
if (STATE.equals(type)) {
return "sparkplugConnectionState";
}
for (SparkplugMessageType messageType : SparkplugMessageType.values()) {
if (messageType.name().equals(type)) {
return messageType.name();
}
}
throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
public boolean isDeath() {
return this.equals(DDEATH) || this.equals(NDEATH);