commit
f9d8be8f6f
@ -44,6 +44,8 @@ public class DataConstants {
|
|||||||
public static final String EDGE_ID = "edgeId";
|
public static final String EDGE_ID = "edgeId";
|
||||||
public static final String DEVICE_ID = "deviceId";
|
public static final String DEVICE_ID = "deviceId";
|
||||||
public static final String GATEWAY_PARAMETER = "gateway";
|
public static final String GATEWAY_PARAMETER = "gateway";
|
||||||
|
|
||||||
|
public static final String OVERWRITE_ACTIVITY_TIME_PARAMETER = "overwriteActivityTime";
|
||||||
public static final String COAP_TRANSPORT_NAME = "COAP";
|
public static final String COAP_TRANSPORT_NAME = "COAP";
|
||||||
public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
|
public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
|
||||||
public static final String MQTT_TRANSPORT_NAME = "MQTT";
|
public static final String MQTT_TRANSPORT_NAME = "MQTT";
|
||||||
|
|||||||
@ -45,6 +45,7 @@ import io.netty.util.concurrent.GenericFutureListener;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.eclipse.leshan.core.ResponseCode;
|
import org.eclipse.leshan.core.ResponseCode;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.server.common.adaptor.AdaptorException;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.DeviceProfile;
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
@ -64,7 +65,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
|||||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||||
import org.thingsboard.server.common.transport.TransportService;
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
import org.thingsboard.server.common.adaptor.AdaptorException;
|
|
||||||
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
|
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
|
||||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
||||||
@ -323,6 +323,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
if (checkConnected(ctx, msg)) {
|
if (checkConnected(ctx, msg)) {
|
||||||
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
|
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
|
||||||
transportService.recordActivity(deviceSessionCtx.getSessionInfo());
|
transportService.recordActivity(deviceSessionCtx.getSessionInfo());
|
||||||
|
if (gatewaySessionHandler != null) {
|
||||||
|
gatewaySessionHandler.onGatewayPing();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DISCONNECT:
|
case DISCONNECT:
|
||||||
@ -1080,12 +1083,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
try {
|
try {
|
||||||
JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
|
JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
|
||||||
if (infoNode != null) {
|
if (infoNode != null) {
|
||||||
JsonNode gatewayNode = infoNode.get("gateway");
|
JsonNode gatewayNode = infoNode.get(DataConstants.GATEWAY_PARAMETER);
|
||||||
if (gatewayNode != null && gatewayNode.asBoolean()) {
|
if (gatewayNode != null && gatewayNode.asBoolean()) {
|
||||||
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId);
|
boolean overwriteDevicesActivity = false;
|
||||||
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
|
if (infoNode.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)
|
||||||
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
|
&& infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) {
|
||||||
|
overwriteDevicesActivity = infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean();
|
||||||
|
sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity);
|
||||||
}
|
}
|
||||||
|
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -1099,7 +1105,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage);
|
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage);
|
||||||
if (sparkplugTopicNode != null) {
|
if (sparkplugTopicNode != null) {
|
||||||
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
|
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
|
||||||
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode);
|
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode);
|
||||||
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode);
|
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode);
|
||||||
sessionMetaData.setOverwriteActivityTime(true);
|
sessionMetaData.setOverwriteActivityTime(true);
|
||||||
} else {
|
} else {
|
||||||
@ -1354,6 +1360,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
@Override
|
@Override
|
||||||
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
|
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
|
||||||
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
|
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
|
||||||
|
if (gatewaySessionHandler != null) {
|
||||||
|
gatewaySessionHandler.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.transport.mqtt.session;
|
package org.thingsboard.server.transport.mqtt.session;
|
||||||
|
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
@ -32,17 +33,21 @@ import io.netty.channel.ChannelFuture;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
|
import org.thingsboard.server.common.adaptor.AdaptorException;
|
||||||
|
import org.thingsboard.server.common.adaptor.JsonConverter;
|
||||||
|
import org.thingsboard.server.common.adaptor.ProtoConverter;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
|
import org.thingsboard.server.common.data.Device;
|
||||||
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.transport.TransportService;
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
import org.thingsboard.server.common.adaptor.AdaptorException;
|
|
||||||
import org.thingsboard.server.common.adaptor.JsonConverter;
|
|
||||||
import org.thingsboard.server.common.adaptor.ProtoConverter;
|
|
||||||
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
||||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
import org.thingsboard.server.gen.transport.TransportApiProtos;
|
import org.thingsboard.server.gen.transport.TransportApiProtos;
|
||||||
@ -64,6 +69,7 @@ import java.util.Date;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@ -101,7 +107,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
protected final ChannelHandlerContext channel;
|
protected final ChannelHandlerContext channel;
|
||||||
protected final DeviceSessionCtx deviceSessionCtx;
|
protected final DeviceSessionCtx deviceSessionCtx;
|
||||||
|
|
||||||
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
|
@Getter
|
||||||
|
@Setter
|
||||||
|
private boolean overwriteDevicesActivity = false;
|
||||||
|
|
||||||
|
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
|
||||||
this.context = deviceSessionCtx.getContext();
|
this.context = deviceSessionCtx.getContext();
|
||||||
this.transportService = context.getTransportService();
|
this.transportService = context.getTransportService();
|
||||||
this.deviceSessionCtx = deviceSessionCtx;
|
this.deviceSessionCtx = deviceSessionCtx;
|
||||||
@ -112,6 +122,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
this.deviceCreationLockMap = createWeakMap();
|
this.deviceCreationLockMap = createWeakMap();
|
||||||
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
|
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
|
||||||
this.channel = deviceSessionCtx.getChannel();
|
this.channel = deviceSessionCtx.getChannel();
|
||||||
|
this.overwriteDevicesActivity = overwriteDevicesActivity;
|
||||||
}
|
}
|
||||||
|
|
||||||
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
|
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
|
||||||
@ -164,6 +175,12 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onGatewayPing() {
|
||||||
|
if (overwriteDevicesActivity) {
|
||||||
|
devices.forEach((deviceName, deviceSessionCtx) -> transportService.recordActivity(deviceSessionCtx.getSessionInfo()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void onDevicesDisconnect() {
|
public void onDevicesDisconnect() {
|
||||||
devices.forEach(this::deregisterSession);
|
devices.forEach(this::deregisterSession);
|
||||||
}
|
}
|
||||||
@ -221,6 +238,14 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
|
||||||
|
log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device);
|
||||||
|
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
|
||||||
|
if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER) && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)) {
|
||||||
|
overwriteDevicesActivity = deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ListenableFuture<T> onDeviceConnect(String deviceName, String deviceType) {
|
ListenableFuture<T> onDeviceConnect(String deviceName, String deviceType) {
|
||||||
T result = devices.get(deviceName);
|
T result = devices.get(deviceName);
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
|
|||||||
@ -25,10 +25,10 @@ import java.util.UUID;
|
|||||||
/**
|
/**
|
||||||
* Created by nickAS21 on 26.12.22
|
* Created by nickAS21 on 26.12.22
|
||||||
*/
|
*/
|
||||||
public class GatewaySessionHandler extends AbstractGatewaySessionHandler {
|
public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> {
|
||||||
|
|
||||||
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
|
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
|
||||||
super(deviceSessionCtx, sessionId);
|
super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
|
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||||
|
|||||||
@ -73,8 +73,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
|
|||||||
private final MqttTransportHandler parent;
|
private final MqttTransportHandler parent;
|
||||||
|
|
||||||
public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId,
|
public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId,
|
||||||
SparkplugTopic sparkplugTopicNode) {
|
boolean overwriteDevicesActivity, SparkplugTopic sparkplugTopicNode) {
|
||||||
super(deviceSessionCtx, sessionId);
|
super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
this.sparkplugTopicNode = sparkplugTopicNode;
|
this.sparkplugTopicNode = sparkplugTopicNode;
|
||||||
this.nodeBirthMetrics = new ConcurrentHashMap<>();
|
this.nodeBirthMetrics = new ConcurrentHashMap<>();
|
||||||
|
|||||||
@ -15,15 +15,101 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.transport.mqtt.session;
|
package org.thingsboard.server.transport.mqtt.session;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
|
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.BDDMockito.willCallRealMethod;
|
import static org.mockito.BDDMockito.willCallRealMethod;
|
||||||
|
import static org.mockito.Mockito.lenient;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.any;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class GatewaySessionHandlerTest {
|
public class GatewaySessionHandlerTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private TransportService transportService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private DeviceSessionCtx deviceSessionCtx;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private MqttTransportContext transportContext;
|
||||||
|
|
||||||
|
private GatewaySessionHandler handler;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setup() {
|
||||||
|
lenient().when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID());
|
||||||
|
lenient().doNothing().when(transportService).recordActivity(any());
|
||||||
|
lenient().when(transportContext.getTransportService()).thenReturn(transportService);
|
||||||
|
lenient().when(deviceSessionCtx.getContext()).thenReturn(transportContext);
|
||||||
|
handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true);
|
||||||
|
lenient().when(handler.getNodeId()).thenReturn("nodeId");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldRecordActivityWhenOnGatewayPing() throws Exception {
|
||||||
|
// Given
|
||||||
|
ConcurrentHashMap<String, GatewayDeviceSessionContext> devices = new ConcurrentHashMap<>();
|
||||||
|
TransportDeviceInfo deviceInfo = new TransportDeviceInfo();
|
||||||
|
deviceInfo.setDeviceId(new DeviceId(UUID.randomUUID()));
|
||||||
|
deviceInfo.setTenantId(new TenantId(UUID.randomUUID()));
|
||||||
|
deviceInfo.setCustomerId(new CustomerId(UUID.randomUUID()));
|
||||||
|
deviceInfo.setDeviceName("device1");
|
||||||
|
deviceInfo.setDeviceType("default");
|
||||||
|
deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID()));
|
||||||
|
deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}");
|
||||||
|
lenient().when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo);
|
||||||
|
GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService);
|
||||||
|
devices.put("device1", gatewayDeviceSessionContext);
|
||||||
|
lenient().when(handler.getNodeId()).thenReturn("nodeId");
|
||||||
|
Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices");
|
||||||
|
devicesField.setAccessible(true);
|
||||||
|
devicesField.set(handler, devices);
|
||||||
|
|
||||||
|
// When
|
||||||
|
handler.onGatewayPing();
|
||||||
|
|
||||||
|
// Then
|
||||||
|
verify(transportService).recordActivity(gatewayDeviceSessionContext.getSessionInfo());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotRecordActivityWhenNoDevicesOnGatewayPing() throws Exception {
|
||||||
|
// Given
|
||||||
|
ConcurrentHashMap<String, GatewayDeviceSessionContext> devices = new ConcurrentHashMap<>();
|
||||||
|
Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices");
|
||||||
|
devicesField.setAccessible(true);
|
||||||
|
devicesField.set(handler, devices);
|
||||||
|
|
||||||
|
// When
|
||||||
|
handler.onGatewayPing();
|
||||||
|
|
||||||
|
// Then
|
||||||
|
verify(transportService, never()).recordActivity(any());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() {
|
public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() {
|
||||||
GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);
|
GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.transport.service;
|
package org.thingsboard.server.common.transport.service;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
@ -131,7 +132,6 @@ import java.util.stream.Collectors;
|
|||||||
@TbTransportComponent
|
@TbTransportComponent
|
||||||
public class DefaultTransportService extends TransportActivityManager implements TransportService {
|
public class DefaultTransportService extends TransportActivityManager implements TransportService {
|
||||||
|
|
||||||
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
|
|
||||||
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = TransportProtos.SessionEventMsg.newBuilder()
|
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = TransportProtos.SessionEventMsg.newBuilder()
|
||||||
.setSessionType(TransportProtos.SessionType.ASYNC)
|
.setSessionType(TransportProtos.SessionType.ASYNC)
|
||||||
.setEvent(TransportProtos.SessionEvent.OPEN).build();
|
.setEvent(TransportProtos.SessionEvent.OPEN).build();
|
||||||
@ -1042,11 +1042,12 @@ public class DefaultTransportService extends TransportActivityManager implements
|
|||||||
.setDeviceProfileIdLSB(deviceProfileIdLSB)
|
.setDeviceProfileIdLSB(deviceProfileIdLSB)
|
||||||
.setDeviceName(device.getName())
|
.setDeviceName(device.getName())
|
||||||
.setDeviceType(device.getType()).build();
|
.setDeviceType(device.getType()).build();
|
||||||
if (device.getAdditionalInfo().has("gateway")
|
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
|
||||||
&& device.getAdditionalInfo().get("gateway").asBoolean()
|
if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER)
|
||||||
&& device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME)
|
&& deviceAdditionalInfo.get(DataConstants.GATEWAY_PARAMETER).asBoolean()
|
||||||
&& device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) {
|
&& deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)
|
||||||
md.setOverwriteActivityTime(device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean());
|
&& deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) {
|
||||||
|
md.setOverwriteActivityTime(deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean());
|
||||||
}
|
}
|
||||||
md.setSessionInfo(newSessionInfo);
|
md.setSessionInfo(newSessionInfo);
|
||||||
transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile)));
|
transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile)));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user