sparkplug: add Sparkplug as extends abstract

This commit is contained in:
nickAS21 2022-12-27 16:43:53 +02:00
parent e238c2ed39
commit d73ab5f406
4 changed files with 21 additions and 376 deletions

View File

@ -114,7 +114,7 @@ public abstract class AbstractGatewaySessionHandler {
if (isJsonPayloadType()) {
onDeviceDisconnectJson(mqttMsg);
} else {
onDeviceDisconnectProto(mqttMsg);
onGatewayNodeDisconnectProto(mqttMsg);
}
}
@ -308,7 +308,7 @@ public abstract class AbstractGatewaySessionHandler {
processOnDisconnect(msg, deviceName);
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
protected void onGatewayNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());

View File

@ -15,139 +15,34 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.annotation.Nullable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic;
/**
* Created by nickAS21 on 12.12.22
*/
@Slf4j
public class SparkplugNodeSessionHandler {
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler{
private static final String DEFAULT_DEVICE_TYPE = "default";
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
private static final String DEVICE_PROPERTY = "device";
private final MqttTransportContext context;
private final TransportService transportService;
private final TransportDeviceInfo nodeSparkplugInfo;
private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, SparkplugSessionCtx> devices = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ListenableFuture<SparkplugSessionCtx>> deviceFutures = new ConcurrentHashMap<>();
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final ChannelHandlerContext channel;
private final DeviceSessionCtx deviceSessionCtx;
private String nodeTopic;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) {
this.context = deviceSessionCtx.getContext();
this.transportService = context.getTransportService();
this.deviceSessionCtx = deviceSessionCtx;
this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo();
this.sessionId = sessionId;
this.deviceCreationLockMap = createWeakMap();
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
this.channel = deviceSessionCtx.getChannel();
super(deviceSessionCtx, sessionId);
this.nodeTopic = nodeTopic;
}
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
}
public String getNodeId() {
return context.getNodeId();
}
public UUID getSessionId() {
return sessionId;
}
public String getNodeTopic() {
return nodeTopic;
}
public int nextMsgId() {
return deviceSessionCtx.nextMsgId();
}
public void deregisterSession(String deviceName) {
SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
deregisterSession(deviceName, deviceSessionCtx);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
}
}
private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}
public void onDeviceDeleted(String deviceName) {
deregisterSession(deviceName);
}
private int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId();
}
public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId();
String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType();
processOnConnect(mqttMsg, deviceName, deviceType);
} catch (Exception e) {
throw new AdaptorException(e);
}
}
public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception {
SparkplugTopic sparkplugTopic = parseTopic(topicName);
log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
log.warn("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
@ -164,7 +59,7 @@ public class SparkplugNodeSessionHandler {
// TODO
break;
case NDEATH:
onNodeDisconnectProto(mqttMsg);
onGatewayNodeDisconnectProto(mqttMsg);
break;
case NRECORD:
// TODO
@ -187,7 +82,7 @@ public class SparkplugNodeSessionHandler {
// TODO
break;
case DDEATH:
onDeviceDisconnectProto(mqttMsg);
onGatewayNodeDisconnectProto(mqttMsg);
break;
case DRECORD:
// TODO
@ -197,36 +92,16 @@ public class SparkplugNodeSessionHandler {
}
}
private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
processOnDisconnect(mqttMsg, deviceName);
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException {
try {
TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload()));
String deviceName = checkDeviceName(connectProto.getDeviceName());
// TODO disconnect device without disconnect Node
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
}
}
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg);
}
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) {
String topicName = sparkplugTopic.toString();
log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
log.warn("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType());
if (sparkplugTopic.isNode()) {
if (sparkplugTopic.getGroupId() == null) {
// TODO SUBSCRIBE NameSpace
} else if (sparkplugTopic.getType() == null) {
// TODO SUBSCRIBE GroupId
}
else if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
@ -275,127 +150,4 @@ public class SparkplugNodeSessionHandler {
}
}
private byte[] getBytes(ByteBuf payload) {
return ProtoMqttAdaptor.toBytes(payload);
}
private void ack(MqttPublishMessage msg) {
int msgId = getMsgId(msg);
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
}
}
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
return channel.writeAndFlush(mqttMessage);
}
private String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!");
} else {
return deviceName;
}
}
private String getDeviceName(JsonElement json) {
return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
}
private String getDeviceType(JsonElement json) {
JsonElement type = json.getAsJsonObject().get("type");
return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
}
private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable SparkplugSessionCtx result) {
ack(msg);
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
}
private ListenableFuture<SparkplugSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
SparkplugSessionCtx result = devices.get(deviceName);
if (result == null) {
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
deviceCreationLock.lock();
try {
result = devices.get(deviceName);
if (result == null) {
return getDeviceCreationFuture(deviceName, deviceType);
} else {
return Futures.immediateFuture(result);
}
} finally {
deviceCreationLock.unlock();
}
} else {
return Futures.immediateFuture(result);
}
}
private ListenableFuture<SparkplugSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
final SettableFuture<SparkplugSessionCtx> futureToSet = SettableFuture.create();
ListenableFuture<SparkplugSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
if (future != null) {
return future;
}
try {
transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits())
.setSparkplug(true)
.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
if (msg.getDeviceInfo() == null) {
System.out.println("DeviceInfo == null");
}
SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
}
futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName);
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
futureToSet.setException(e);
deviceFutures.remove(deviceName);
}
});
return futureToSet;
} catch (Throwable e) {
deviceFutures.remove(deviceName);
throw e;
}
}
}

View File

@ -15,132 +15,25 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
/**
* Created by nickAS21 on 08.12.22
*/
@Slf4j
public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext {
private final SparkplugNodeSessionHandler parent;
private final TransportService transportService;
public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
public SparkplugSessionCtx(AbstractGatewaySessionHandler parent,
TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile,
ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) {
super(UUID.randomUUID(), mqttQoSMap);
this.parent = parent;
setSessionInfo(SessionInfoProto.newBuilder()
.setNodeId(parent.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
.setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
.setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
.setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
.setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
.setDeviceName(deviceInfo.getDeviceName())
.setDeviceType(deviceInfo.getDeviceType())
.setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
.setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
.setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
.build());
setDeviceInfo(deviceInfo);
setConnected(true);
setDeviceProfile(deviceProfile);
this.transportService = transportService;
}
@Override
public UUID getSessionId() {
return sessionId;
}
@Override
public int nextMsgId() {
return parent.nextMsgId();
}
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush);
// } catch (Exception e) {
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
// log.trace("[{}] Received attributes update notification to device", sessionId);
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
// } catch (Exception e) {
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
// log.trace("[{}] Received RPC command to device", sessionId);
// try {
// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
// payload -> {
// ChannelFuture channelFuture = parent.writeAndFlush(payload);
// if (request.getPersisted()) {
// channelFuture.addListener(result -> {
// if (result.cause() == null) {
// if (!isAckExpected(payload)) {
// transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
// } else if (request.getPersisted()) {
// transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
//
// }
// }
// });
// }
// }
// );
// } catch (Exception e) {
// transportService.process(getSessionInfo(),
// TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
// .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
// }
}
@Override
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
parent.deregisterSession(getDeviceInfo().getDeviceName());
}
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
// This feature is not supported in the TB IoT Gateway yet.
}
@Override
public void onDeviceDeleted(DeviceId deviceId) {
parent.onDeviceDeleted(this.getSessionInfo().getDeviceName());
}
private boolean isAckExpected(MqttMessage message) {
return message.fixedHeader().qosLevel().value() > 0;
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
}
}

View File

@ -46,7 +46,7 @@ public class GatewaySessionHandlerTest {
@Test
public void givenConcurrentReferenceHashMap_WhenGC_thenMapIsEmpty() {
AbstractGatewaySessionHandler gsh = mock(AbstractGatewaySessionHandler.class);
GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);
willCallRealMethod().given(gsh).createWeakMap();
ConcurrentMap<String, Lock> map = gsh.createWeakMap();