From 7babba1db40c2e89bcaa2ea6a1b346b81cfb08ad Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Mon, 26 Dec 2022 16:57:06 +0200 Subject: [PATCH] sparkplug: Rename Gateway to abstract --- .../transport/mqtt/MqttTransportHandler.java | 6 +- ... AbstractGatewayDeviceSessionContext.java} | 10 +-- ...ava => AbstractGatewaySessionHandler.java} | 74 +++++++++---------- .../session/GatewaySessionHandlerTest.java | 2 +- 4 files changed, 46 insertions(+), 46 deletions(-) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/{GatewayDeviceSessionCtx.java => AbstractGatewayDeviceSessionContext.java} (92%) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/{GatewaySessionHandler.java => AbstractGatewaySessionHandler.java} (90%) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 4d98662be2..fc41989c82 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -71,7 +71,7 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; +import org.thingsboard.server.transport.mqtt.session.AbstractGatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; @@ -131,7 +131,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement final DeviceSessionCtx deviceSessionCtx; volatile InetSocketAddress address; - volatile GatewaySessionHandler gatewaySessionHandler; + volatile AbstractGatewaySessionHandler gatewaySessionHandler; volatile SparkplugNodeSessionHandler sparkplugSessionHandler; private final ConcurrentHashMap otaPackSessions; @@ -967,7 +967,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); + gatewaySessionHandler = new AbstractGatewaySessionHandler(deviceSessionCtx, sessionId); if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java similarity index 92% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java index debdab87b5..09b169f567 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java @@ -35,14 +35,14 @@ import java.util.concurrent.ConcurrentMap; * Created by ashvayka on 19.01.17. */ @Slf4j -public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { +public class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { - private final GatewaySessionHandler parent; + private final AbstractGatewaySessionHandler parent; private final TransportService transportService; - public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, - DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, - TransportService transportService) { + public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + TransportService transportService) { super(UUID.randomUUID(), mqttQoSMap); this.parent = parent; setSessionInfo(SessionInfoProto.newBuilder() diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java similarity index 90% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index fad67470ba..7819191638 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe * Created by ashvayka on 19.01.17. */ @Slf4j -public class GatewaySessionHandler { +public class AbstractGatewaySessionHandler { private static final String DEFAULT_DEVICE_TYPE = "default"; private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; @@ -87,13 +87,13 @@ public class GatewaySessionHandler { private final TransportDeviceInfo gateway; private final UUID sessionId; private final ConcurrentMap deviceCreationLockMap; - private final ConcurrentMap devices; - private final ConcurrentMap> deviceFutures; + private final ConcurrentMap devices; + private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; private final ChannelHandlerContext channel; private final DeviceSessionCtx deviceSessionCtx; - public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { + public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { this.context = deviceSessionCtx.getContext(); this.transportService = context.getTransportService(); this.deviceSessionCtx = deviceSessionCtx; @@ -195,7 +195,7 @@ public class GatewaySessionHandler { } void deregisterSession(String deviceName) { - GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); + AbstractGatewayDeviceSessionContext deviceSessionCtx = devices.remove(deviceName); if (deviceSessionCtx != null) { deregisterSession(deviceName, deviceSessionCtx); } else { @@ -217,9 +217,9 @@ public class GatewaySessionHandler { private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName); - Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback() { + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx result) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext result) { ack(msg); log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); } @@ -232,8 +232,8 @@ public class GatewaySessionHandler { }, context.getExecutor()); } - private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { - GatewayDeviceSessionCtx result = devices.get(deviceName); + private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { + AbstractGatewayDeviceSessionContext result = devices.get(deviceName); if (result == null) { Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); deviceCreationLock.lock(); @@ -252,9 +252,9 @@ public class GatewaySessionHandler { } } - private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { - final SettableFuture futureToSet = SettableFuture.create(); - ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + final SettableFuture futureToSet = SettableFuture.create(); + ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); if (future != null) { return future; } @@ -267,7 +267,7 @@ public class GatewaySessionHandler { new TransportServiceCallback() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + AbstractGatewayDeviceSessionContext deviceSessionCtx = new AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); @@ -348,7 +348,7 @@ public class GatewaySessionHandler { Futures.addCallback(checkDeviceConnected(deviceName), new FutureCallback<>() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonArray()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -380,9 +380,9 @@ public class GatewaySessionHandler { deviceMsgList.forEach(telemetryMsg -> { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg(); try { TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); @@ -408,7 +408,7 @@ public class GatewaySessionHandler { } } - private void processPostTelemetryMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { + private void processPostTelemetryMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); } @@ -419,9 +419,9 @@ public class GatewaySessionHandler { for (Map.Entry deviceEntry : jsonObj.entrySet()) { String deviceName = deviceEntry.getKey(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -453,9 +453,9 @@ public class GatewaySessionHandler { claimMsgList.forEach(claimDeviceMsg -> { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest(); if (claimRequest == null) { throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!"); @@ -484,7 +484,7 @@ public class GatewaySessionHandler { } } - private void processClaimDeviceMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { + private void processClaimDeviceMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); } @@ -495,9 +495,9 @@ public class GatewaySessionHandler { for (Map.Entry deviceEntry : jsonObj.entrySet()) { String deviceName = deviceEntry.getKey(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { if (!deviceEntry.getValue().isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } @@ -524,9 +524,9 @@ public class GatewaySessionHandler { attributesMsgList.forEach(attributesMsg -> { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); if (kvListProto == null) { throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!"); @@ -554,7 +554,7 @@ public class GatewaySessionHandler { } } - private void processPostAttributesMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { + private void processPostAttributesMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); } @@ -603,9 +603,9 @@ public class GatewaySessionHandler { JsonObject jsonObj = json.getAsJsonObject(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { Integer requestId = jsonObj.get("id").getAsInt(); String data = jsonObj.get("data").toString(); TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() @@ -628,9 +628,9 @@ public class GatewaySessionHandler { TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload)); String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName()); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { Integer requestId = gatewayRpcResponseMsg.getId(); String data = gatewayRpcResponseMsg.getData(); TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() @@ -648,16 +648,16 @@ public class GatewaySessionHandler { } } - private void processRpcResponseMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { + private void processRpcResponseMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); } private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { int msgId = getMsgId(mqttMsg); Futures.addCallback(checkDeviceConnected(deviceName), - new FutureCallback() { + new FutureCallback() { @Override - public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) { transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); } @@ -681,8 +681,8 @@ public class GatewaySessionHandler { return result.build(); } - private ListenableFuture checkDeviceConnected(String deviceName) { - GatewayDeviceSessionCtx ctx = devices.get(deviceName); + private ListenableFuture checkDeviceConnected(String deviceName) { + AbstractGatewayDeviceSessionContext ctx = devices.get(deviceName); if (ctx == null) { log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName); return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); @@ -723,7 +723,7 @@ public class GatewaySessionHandler { } } - private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) { + private void deregisterSession(String deviceName, AbstractGatewayDeviceSessionContext deviceSessionCtx) { transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index 4b42ccf564..de3bbba8e0 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -46,7 +46,7 @@ public class GatewaySessionHandlerTest { @Test public void givenConcurrentReferenceHashMap_WhenGC_thenMapIsEmpty() { - GatewaySessionHandler gsh = mock(GatewaySessionHandler.class); + AbstractGatewaySessionHandler gsh = mock(AbstractGatewaySessionHandler.class); willCallRealMethod().given(gsh).createWeakMap(); ConcurrentMap map = gsh.createWeakMap();