diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 8360111a54..2ecea63e42 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -45,6 +45,7 @@ import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -64,7 +65,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; @@ -323,6 +323,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (checkConnected(ctx, msg)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); transportService.recordActivity(deviceSessionCtx.getSessionInfo()); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onGatewayPing(); + } } break; case DISCONNECT: @@ -1082,10 +1085,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); + boolean overwriteDevicesActivity = false; if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); + overwriteDevicesActivity = infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean(); + sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity); } + gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity); } } } catch (IOException e) { @@ -1099,7 +1104,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode); sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); sessionMetaData.setOverwriteActivityTime(true); } else { @@ -1354,6 +1359,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + } } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 1cb391b1f4..dc6ba367ab 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -32,17 +32,21 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; -import org.thingsboard.server.common.adaptor.JsonConverter; -import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportApiProtos; @@ -64,6 +68,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -89,6 +94,8 @@ public abstract class AbstractGatewaySessionHandler createWeakMap() { @@ -164,6 +176,12 @@ public abstract class AbstractGatewaySessionHandler transportService.recordActivity(deviceSessionCtx.getSessionInfo())); + } + } + public void onDevicesDisconnect() { devices.forEach(this::deregisterSession); } @@ -221,6 +239,16 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { + log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); + if (device.getAdditionalInfo().has(GATEWAY_PROPERTY) + && device.getAdditionalInfo().get(GATEWAY_PROPERTY).asBoolean() + && device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME) + && device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { + overwriteDevicesActivity = device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean(); + } + } + ListenableFuture onDeviceConnect(String deviceName, String deviceType) { T result = devices.get(deviceName); if (result == null) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 9ccc88aa82..9aa228afff 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -25,10 +25,10 @@ import java.util.UUID; /** * Created by nickAS21 on 26.12.22 */ -public class GatewaySessionHandler extends AbstractGatewaySessionHandler { +public class GatewaySessionHandler extends AbstractGatewaySessionHandler { - public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { - super(deviceSessionCtx, sessionId); + public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { + super(deviceSessionCtx, sessionId, overwriteDevicesActivity); } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index d4f2364d79..67dda4681a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -73,8 +73,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler(); diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index bd1ddd2d54..9b52d57dc2 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -15,15 +15,98 @@ */ package org.thingsboard.server.transport.mqtt.session; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; + +import java.lang.reflect.Field; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; public class GatewaySessionHandlerTest { + @Mock + private TransportService transportService; + + @Mock + private DeviceSessionCtx deviceSessionCtx; + + @Mock + private MqttTransportContext transportContext; + + private GatewaySessionHandler handler; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID()); + doNothing().when(transportService).recordActivity(any()); + when(transportContext.getTransportService()).thenReturn(transportService); + when(deviceSessionCtx.getContext()).thenReturn(transportContext); + handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true); + when(handler.getNodeId()).thenReturn("nodeId"); + } + + @Test + public void shouldRecordActivityWhenOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + TransportDeviceInfo deviceInfo = new TransportDeviceInfo(); + deviceInfo.setDeviceId(new DeviceId(UUID.randomUUID())); + deviceInfo.setTenantId(new TenantId(UUID.randomUUID())); + deviceInfo.setCustomerId(new CustomerId(UUID.randomUUID())); + deviceInfo.setDeviceName("device1"); + deviceInfo.setDeviceType("default"); + deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); + deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}"); + when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo); + GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService); + devices.put("device1", gatewayDeviceSessionContext); + when(handler.getNodeId()).thenReturn("nodeId"); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService).recordActivity(gatewayDeviceSessionContext.getSessionInfo()); + } + + @Test + public void shouldNotRecordActivityWhenNoDevicesOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService, never()).recordActivity(any()); + } + @Test public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() { GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);