diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 53b5d2ec63..b529edeb0c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -44,6 +44,8 @@ public class DataConstants { public static final String EDGE_ID = "edgeId"; public static final String DEVICE_ID = "deviceId"; public static final String GATEWAY_PARAMETER = "gateway"; + + public static final String OVERWRITE_ACTIVITY_TIME_PARAMETER = "overwriteActivityTime"; public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String MQTT_TRANSPORT_NAME = "MQTT"; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 8360111a54..b48cb39ca0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -45,6 +45,7 @@ import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -64,7 +65,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; @@ -323,6 +323,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (checkConnected(ctx, msg)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); transportService.recordActivity(deviceSessionCtx.getSessionInfo()); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onGatewayPing(); + } } break; case DISCONNECT: @@ -1080,12 +1083,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo()); if (infoNode != null) { - JsonNode gatewayNode = infoNode.get("gateway"); + JsonNode gatewayNode = infoNode.get(DataConstants.GATEWAY_PARAMETER); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); + boolean overwriteDevicesActivity = false; + if (infoNode.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER) + && infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) { + overwriteDevicesActivity = infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean(); + sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity); } + gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity); } } } catch (IOException e) { @@ -1099,7 +1105,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode); sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); sessionMetaData.setOverwriteActivityTime(true); } else { @@ -1354,6 +1360,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + } } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 11f983ee3d..d033774e93 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -32,17 +33,21 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; -import org.thingsboard.server.common.adaptor.JsonConverter; -import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportApiProtos; @@ -64,6 +69,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -101,7 +107,11 @@ public abstract class AbstractGatewaySessionHandler createWeakMap() { @@ -164,6 +175,12 @@ public abstract class AbstractGatewaySessionHandler transportService.recordActivity(deviceSessionCtx.getSessionInfo())); + } + } + public void onDevicesDisconnect() { devices.forEach(this::deregisterSession); } @@ -221,6 +238,14 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { + log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER) && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)) { + overwriteDevicesActivity = deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean(); + } + } + ListenableFuture onDeviceConnect(String deviceName, String deviceType) { T result = devices.get(deviceName); if (result == null) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 9ccc88aa82..9aa228afff 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -25,10 +25,10 @@ import java.util.UUID; /** * Created by nickAS21 on 26.12.22 */ -public class GatewaySessionHandler extends AbstractGatewaySessionHandler { +public class GatewaySessionHandler extends AbstractGatewaySessionHandler { - public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { - super(deviceSessionCtx, sessionId); + public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { + super(deviceSessionCtx, sessionId, overwriteDevicesActivity); } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 05658fe97a..1f389fced1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -73,8 +73,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler(); diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index bd1ddd2d54..158acc2ce2 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -15,15 +15,101 @@ */ package org.thingsboard.server.transport.mqtt.session; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; + +import java.lang.reflect.Field; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; +@ExtendWith(MockitoExtension.class) public class GatewaySessionHandlerTest { + @Mock + private TransportService transportService; + + @Mock + private DeviceSessionCtx deviceSessionCtx; + + @Mock + private MqttTransportContext transportContext; + + private GatewaySessionHandler handler; + + @BeforeEach + public void setup() { + lenient().when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID()); + lenient().doNothing().when(transportService).recordActivity(any()); + lenient().when(transportContext.getTransportService()).thenReturn(transportService); + lenient().when(deviceSessionCtx.getContext()).thenReturn(transportContext); + handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true); + lenient().when(handler.getNodeId()).thenReturn("nodeId"); + } + + @Test + public void shouldRecordActivityWhenOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + TransportDeviceInfo deviceInfo = new TransportDeviceInfo(); + deviceInfo.setDeviceId(new DeviceId(UUID.randomUUID())); + deviceInfo.setTenantId(new TenantId(UUID.randomUUID())); + deviceInfo.setCustomerId(new CustomerId(UUID.randomUUID())); + deviceInfo.setDeviceName("device1"); + deviceInfo.setDeviceType("default"); + deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); + deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}"); + lenient().when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo); + GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService); + devices.put("device1", gatewayDeviceSessionContext); + lenient().when(handler.getNodeId()).thenReturn("nodeId"); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService).recordActivity(gatewayDeviceSessionContext.getSessionInfo()); + } + + @Test + public void shouldNotRecordActivityWhenNoDevicesOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService, never()).recordActivity(any()); + } + @Test public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() { GatewaySessionHandler gsh = mock(GatewaySessionHandler.class); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 8854ad3a7d..6280f09cfd 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.service; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -131,7 +132,6 @@ import java.util.stream.Collectors; @TbTransportComponent public class DefaultTransportService extends TransportActivityManager implements TransportService { - public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = TransportProtos.SessionEventMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC) .setEvent(TransportProtos.SessionEvent.OPEN).build(); @@ -1042,11 +1042,12 @@ public class DefaultTransportService extends TransportActivityManager implements .setDeviceProfileIdLSB(deviceProfileIdLSB) .setDeviceName(device.getName()) .setDeviceType(device.getType()).build(); - if (device.getAdditionalInfo().has("gateway") - && device.getAdditionalInfo().get("gateway").asBoolean() - && device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME) - && device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { - md.setOverwriteActivityTime(device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean()); + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER) + && deviceAdditionalInfo.get(DataConstants.GATEWAY_PARAMETER).asBoolean() + && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER) + && deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) { + md.setOverwriteActivityTime(deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean()); } md.setSessionInfo(newSessionInfo); transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile)));