Added logic when ping requests for gateway device will be also activity events for devices, connected through the gateway

This commit is contained in:
imbeacon 2024-04-04 10:45:41 +03:00
parent 7121784f7f
commit f621d59be3
5 changed files with 132 additions and 13 deletions

View File

@ -45,6 +45,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.ResponseCode;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
@ -64,7 +65,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
@ -323,6 +323,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (checkConnected(ctx, msg)) { if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.recordActivity(deviceSessionCtx.getSessionInfo()); transportService.recordActivity(deviceSessionCtx.getSessionInfo());
if (gatewaySessionHandler != null) {
gatewaySessionHandler.onGatewayPing();
}
} }
break; break;
case DISCONNECT: case DISCONNECT:
@ -1082,10 +1085,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) { if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway"); JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) { if (gatewayNode != null && gatewayNode.asBoolean()) {
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); boolean overwriteDevicesActivity = false;
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); overwriteDevicesActivity = infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean();
sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity);
} }
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity);
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -1099,7 +1104,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage);
if (sparkplugTopicNode != null) { if (sparkplugTopicNode != null) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode);
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode);
sessionMetaData.setOverwriteActivityTime(true); sessionMetaData.setOverwriteActivityTime(true);
} else { } else {
@ -1354,6 +1359,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override @Override
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
}
} }
@Override @Override

View File

@ -32,17 +32,21 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.adaptor.ProtoConverter;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.adaptor.ProtoConverter;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportApiProtos;
@ -64,6 +68,7 @@ import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -89,6 +94,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
private static final String DEVICE_PROPERTY = "device"; private static final String DEVICE_PROPERTY = "device";
private static final String GATEWAY_PROPERTY = "gateway";
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
protected final MqttTransportContext context; protected final MqttTransportContext context;
protected final TransportService transportService; protected final TransportService transportService;
@ -101,7 +108,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected final ChannelHandlerContext channel; protected final ChannelHandlerContext channel;
protected final DeviceSessionCtx deviceSessionCtx; protected final DeviceSessionCtx deviceSessionCtx;
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { @Getter
@Setter
private boolean overwriteDevicesActivity = false;
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
this.context = deviceSessionCtx.getContext(); this.context = deviceSessionCtx.getContext();
this.transportService = context.getTransportService(); this.transportService = context.getTransportService();
this.deviceSessionCtx = deviceSessionCtx; this.deviceSessionCtx = deviceSessionCtx;
@ -112,6 +123,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
this.deviceCreationLockMap = createWeakMap(); this.deviceCreationLockMap = createWeakMap();
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
this.channel = deviceSessionCtx.getChannel(); this.channel = deviceSessionCtx.getChannel();
this.overwriteDevicesActivity = overwriteDevicesActivity;
} }
ConcurrentReferenceHashMap<String, Lock> createWeakMap() { ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
@ -164,6 +176,12 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
public void onGatewayPing() {
if (overwriteDevicesActivity) {
devices.forEach((deviceName, deviceSessionCtx) -> transportService.recordActivity(deviceSessionCtx.getSessionInfo()));
}
}
public void onDevicesDisconnect() { public void onDevicesDisconnect() {
devices.forEach(this::deregisterSession); devices.forEach(this::deregisterSession);
} }
@ -221,6 +239,16 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}, context.getExecutor()); }, context.getExecutor());
} }
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device);
if (device.getAdditionalInfo().has(GATEWAY_PROPERTY)
&& device.getAdditionalInfo().get(GATEWAY_PROPERTY).asBoolean()
&& device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME)
&& device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) {
overwriteDevicesActivity = device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean();
}
}
ListenableFuture<T> onDeviceConnect(String deviceName, String deviceType) { ListenableFuture<T> onDeviceConnect(String deviceName, String deviceType) {
T result = devices.get(deviceName); T result = devices.get(deviceName);
if (result == null) { if (result == null) {

View File

@ -25,10 +25,10 @@ import java.util.UUID;
/** /**
* Created by nickAS21 on 26.12.22 * Created by nickAS21 on 26.12.22
*/ */
public class GatewaySessionHandler extends AbstractGatewaySessionHandler { public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> {
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
super(deviceSessionCtx, sessionId); super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
} }
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {

View File

@ -73,8 +73,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
private final MqttTransportHandler parent; private final MqttTransportHandler parent;
public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId, public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId,
SparkplugTopic sparkplugTopicNode) { boolean overwriteDevicesActivity, SparkplugTopic sparkplugTopicNode) {
super(deviceSessionCtx, sessionId); super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
this.parent = parent; this.parent = parent;
this.sparkplugTopicNode = sparkplugTopicNode; this.sparkplugTopicNode = sparkplugTopicNode;
this.nodeBirthMetrics = new ConcurrentHashMap<>(); this.nodeBirthMetrics = new ConcurrentHashMap<>();

View File

@ -15,15 +15,98 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
public class GatewaySessionHandlerTest { public class GatewaySessionHandlerTest {
@Mock
private TransportService transportService;
@Mock
private DeviceSessionCtx deviceSessionCtx;
@Mock
private MqttTransportContext transportContext;
private GatewaySessionHandler handler;
@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID());
doNothing().when(transportService).recordActivity(any());
when(transportContext.getTransportService()).thenReturn(transportService);
when(deviceSessionCtx.getContext()).thenReturn(transportContext);
handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true);
when(handler.getNodeId()).thenReturn("nodeId");
}
@Test
public void shouldRecordActivityWhenOnGatewayPing() throws Exception {
// Given
ConcurrentHashMap<String, GatewayDeviceSessionContext> devices = new ConcurrentHashMap<>();
TransportDeviceInfo deviceInfo = new TransportDeviceInfo();
deviceInfo.setDeviceId(new DeviceId(UUID.randomUUID()));
deviceInfo.setTenantId(new TenantId(UUID.randomUUID()));
deviceInfo.setCustomerId(new CustomerId(UUID.randomUUID()));
deviceInfo.setDeviceName("device1");
deviceInfo.setDeviceType("default");
deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID()));
deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}");
when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo);
GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService);
devices.put("device1", gatewayDeviceSessionContext);
when(handler.getNodeId()).thenReturn("nodeId");
Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices");
devicesField.setAccessible(true);
devicesField.set(handler, devices);
// When
handler.onGatewayPing();
// Then
verify(transportService).recordActivity(gatewayDeviceSessionContext.getSessionInfo());
}
@Test
public void shouldNotRecordActivityWhenNoDevicesOnGatewayPing() throws Exception {
// Given
ConcurrentHashMap<String, GatewayDeviceSessionContext> devices = new ConcurrentHashMap<>();
Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices");
devicesField.setAccessible(true);
devicesField.set(handler, devices);
// When
handler.onGatewayPing();
// Then
verify(transportService, never()).recordActivity(any());
}
@Test @Test
public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() { public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() {
GatewaySessionHandler gsh = mock(GatewaySessionHandler.class); GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);