Add device state node implementation

This commit is contained in:
Dmytro Skarzhynets 2023-09-08 14:58:33 +03:00
parent d670267e39
commit 0867030cba
9 changed files with 796 additions and 38 deletions

View File

@ -256,14 +256,23 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg()); log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) { } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg()); log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback); forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) { } else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg()); log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback); forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) { } else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg()); log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback); forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) { } else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) { if (actorMsg.isPresent()) {
@ -592,17 +601,54 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
stateService.onQueueMsg(deviceStateServiceMsg, callback); stateService.onQueueMsg(deviceStateServiceMsg, callback);
} }
private void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
var tenantId = toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
try {
stateService.onDeviceConnect(tenantId, deviceId);
callback.onSuccess();
} catch (Exception e) {
log.warn("Failed process device connect message for device [{}]", deviceId, e);
callback.onFailure(e);
}
}
private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) { private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) { if (statsEnabled) {
stats.log(deviceActivityMsg); stats.log(deviceActivityMsg);
} }
TenantId tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB()); var tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB())); var deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try { try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime()); stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess(); callback.onSuccess();
} catch (Exception e) { } catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e)); log.warn("Failed process device activity message for device [{}]", deviceId, e);
callback.onFailure(e);
}
}
private void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
var tenantId = toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
try {
stateService.onDeviceDisconnect(tenantId, deviceId);
callback.onSuccess();
} catch (Exception e) {
log.warn("Failed process device activity message for device [{}]", deviceId, e);
callback.onFailure(e);
}
}
private void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
var tenantId = toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceInactivity(tenantId, deviceId);
callback.onSuccess();
} catch (Exception e) {
log.warn("Failed process device inactivity message for device [{}]", deviceId, e);
callback.onFailure(e);
} }
} }

View File

@ -218,7 +218,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override @Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId) { public void onDeviceConnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) { if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return; return;
} }
log.trace("on Device Connect [{}]", deviceId.getId()); log.trace("on Device Connect [{}]", deviceId.getId());
@ -232,7 +232,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override @Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivity) { public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivity) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) { if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return; return;
} }
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity); log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
@ -253,16 +253,17 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
onDeviceActivityStatusChange(deviceId, true, stateData); onDeviceActivityStatusChange(deviceId, true, stateData);
} }
} else { } else {
log.debug("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity); log.debug("updateActivityState - fetched state IS NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
cleanupEntity(deviceId); cleanupEntity(deviceId);
} }
} }
@Override @Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) { public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) { if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return; return;
} }
log.trace("on Device Disconnect [{}]", deviceId.getId());
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
stateData.getState().setLastDisconnectTime(ts); stateData.getState().setLastDisconnectTime(ts);
@ -272,7 +273,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override @Override
public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout) { public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) { if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return; return;
} }
if (inactivityTimeout <= 0L) { if (inactivityTimeout <= 0L) {
@ -284,6 +285,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
checkAndUpdateState(deviceId, stateData); checkAndUpdateState(deviceId, stateData);
} }
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
log.trace("on Device Inactivity [{}]", deviceId.getId());
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
reportInactivity(System.currentTimeMillis(), deviceId, stateData);
}
@Override @Override
public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) { public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {
try { try {
@ -455,7 +466,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
} }
void reportActivityStats() { void reportActivityStats() {
try{ try {
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> stats = new HashMap<>(); Map<TenantId, Pair<AtomicInteger, AtomicInteger>> stats = new HashMap<>();
for (DeviceStateData stateData : deviceStates.values()) { for (DeviceStateData stateData : deviceStates.values()) {
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(), Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(),
@ -489,10 +500,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) { && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) { if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
state.setActive(false); reportInactivity(ts, deviceId, stateData);
state.setLastInactivityAlarmTime(ts);
onDeviceActivityStatusChange(deviceId, false, stateData);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
} else { } else {
cleanupEntity(deviceId); cleanupEntity(deviceId);
} }
@ -503,29 +511,31 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
} }
} }
private void reportInactivity(long ts, DeviceId deviceId, DeviceStateData stateData) {
DeviceState state = stateData.getState();
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
onDeviceActivityStatusChange(deviceId, false, stateData);
}
boolean isActive(long ts, DeviceState state) { boolean isActive(long ts, DeviceState state) {
return ts < state.getLastActivityTime() + state.getInactivityTimeout(); return ts < state.getLastActivityTime() + state.getInactivityTimeout();
} }
@Nonnull @Nonnull
DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) { DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
DeviceStateData deviceStateData = deviceStates.get(deviceId); return deviceStates.computeIfAbsent(deviceId, this::fetchDeviceStateDataUsingSeparateRequests);
if (deviceStateData != null) {
return deviceStateData;
}
return fetchDeviceStateDataUsingEntityDataQuery(deviceId);
} }
DeviceStateData fetchDeviceStateDataUsingEntityDataQuery(final DeviceId deviceId) { DeviceStateData fetchDeviceStateDataUsingSeparateRequests(final DeviceId deviceId) {
final Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); final Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device == null) { if (device == null) {
log.warn("[{}] Failed to fetch device by Id!", deviceId); log.warn("[{}] Failed to fetch device by Id!", deviceId);
throw new RuntimeException("Failed to fetch device by Id " + deviceId); throw new RuntimeException("Failed to fetch device by Id " + deviceId);
} }
try { try {
DeviceStateData deviceStateData = fetchDeviceState(device).get(); return fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
return deviceStateData;
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
log.warn("[{}] Failed to fetch device state!", deviceId, e); log.warn("[{}] Failed to fetch device state!", deviceId, e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -545,7 +555,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.build()); .build());
} }
private boolean cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) { private boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
boolean cleanup = !partitionedEntities.containsKey(tpi); boolean cleanup = !partitionedEntities.containsKey(tpi);
if (cleanup) { if (cleanup) {
@ -613,7 +623,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L); long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L); long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs); long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state // Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
final boolean active = getEntryValue(data, ACTIVITY_STATE, false); final boolean active = getEntryValue(data, ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder() DeviceState deviceState = DeviceState.builder()
.active(active) .active(active)
@ -693,7 +703,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
deviceIdInfo.getDeviceId(), inactivityTimeout); deviceIdInfo.getDeviceId(), inactivityTimeout);
inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs); inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
} }
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state // Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false); final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder() DeviceState deviceState = DeviceState.builder()
.active(active) .active(active)

View File

@ -33,6 +33,8 @@ public interface DeviceStateService extends ApplicationListener<PartitionChangeE
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId); void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId);
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId);
void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout); void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout);
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes); void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);

View File

@ -687,11 +687,11 @@ audit-log:
password: "${AUDIT_LOG_SINK_PASSWORD:}" password: "${AUDIT_LOG_SINK_PASSWORD:}"
state: state:
# Should be greater then transport.sessions.report_timeout # Should be greater than transport.sessions.report_timeout
defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:600}" defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:600}"
defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:60}" defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:60}"
# Controls whether we store device 'active' flag in attributes (default) or telemetry. # Controls whether we store the device 'active' flag in attributes (default) or telemetry.
# If you device to change this parameter, you should re-create the device info view as one of the following: # If you decide to change this parameter, you should re-create the device info view as one of the following:
# If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;' # If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;'
# If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;' # If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;'
persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"

View File

@ -19,6 +19,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@ -27,10 +28,13 @@ import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.DeviceIdInfo;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
@ -49,14 +53,26 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is; import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat; import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACTIVITY_STATE;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT; import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -76,6 +92,8 @@ public class DefaultDeviceStateServiceTest {
DeviceStateData deviceStateDataMock; DeviceStateData deviceStateDataMock;
@Mock @Mock
EntityQueryRepository entityQueryRepository; EntityQueryRepository entityQueryRepository;
@Mock
NotificationRuleProcessor notificationRuleProcessor;
TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112")); TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"));
DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"); DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
@ -87,7 +105,7 @@ public class DefaultDeviceStateServiceTest {
@Before @Before
public void setUp() { public void setUp() {
service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, mock(DefaultTbApiUsageReportClient.class), mock(NotificationRuleProcessor.class))); service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, mock(DefaultTbApiUsageReportClient.class), notificationRuleProcessor));
telemetrySubscriptionService = Mockito.mock(TelemetrySubscriptionService.class); telemetrySubscriptionService = Mockito.mock(TelemetrySubscriptionService.class);
ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService); ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService);
ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60); ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60);
@ -102,21 +120,121 @@ public class DefaultDeviceStateServiceTest {
.thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false)); .thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false));
} }
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() {
// GIVEN
service.deviceStates.put(deviceId, DeviceStateData.builder().build());
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId))
.willReturn(TopicPartitionInfo.builder().build());
// WHEN
service.onDeviceInactivity(tenantId, deviceId);
// THEN
assertThat(service.deviceStates).isEmpty();
then(service).should(never()).fetchDeviceStateDataUsingSeparateRequests(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() throws InterruptedException {
// GIVEN
initStateService(10000);
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
// WHEN
long timeBeforeCall = System.currentTimeMillis();
service.onDeviceInactivity(tenantId, deviceId);
long timeAfterCall = System.currentTimeMillis();
// THEN
var inactivityTimeCaptor = ArgumentCaptor.forClass(Long.class);
then(telemetrySubscriptionService).should().saveAttrAndNotify(
any(), eq(deviceId), any(), eq(INACTIVITY_ALARM_TIME), inactivityTimeCaptor.capture(), any()
);
var actualInactivityTime = inactivityTimeCaptor.getValue();
assertThat(actualInactivityTime).isGreaterThanOrEqualTo(timeBeforeCall);
assertThat(actualInactivityTime).isLessThanOrEqualTo(timeAfterCall);
then(telemetrySubscriptionService).should().saveAttrAndNotify(
any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class);
then(notificationRuleProcessor).should().process(notificationCaptor.capture());
var actualNotification = notificationCaptor.getValue();
assertThat(actualNotification.getTenantId()).isEqualTo(tenantId);
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
}
@Test
public void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi);
// WHEN
service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData);
// THEN
then(telemetrySubscriptionService).should().saveAttrAndNotify(
any(), eq(deviceId), any(), eq(INACTIVITY_ALARM_TIME), anyLong(), any()
);
then(telemetrySubscriptionService).should().saveAttrAndNotify(
any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class);
then(notificationRuleProcessor).should().process(notificationCaptor.capture());
var actualNotification = notificationCaptor.getValue();
assertThat(actualNotification.getTenantId()).isEqualTo(tenantId);
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
}
@Test @Test
public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() { public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() {
service.deviceStates.put(deviceId, deviceStateDataMock); service.deviceStates.put(deviceId, deviceStateDataMock);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
assertThat(deviceStateData, is(deviceStateDataMock)); assertThat(deviceStateData).isEqualTo(deviceStateDataMock);
Mockito.verify(service, never()).fetchDeviceStateDataUsingEntityDataQuery(deviceId); Mockito.verify(service, never()).fetchDeviceStateDataUsingSeparateRequests(deviceId);
} }
@Test @Test
public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() { public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
service.deviceStates.clear(); service.deviceStates.clear();
willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingEntityDataQuery(deviceId); willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
assertThat(deviceStateData, is(deviceStateDataMock)); assertThat(deviceStateData).isEqualTo(deviceStateDataMock);
Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId); Mockito.verify(service, times(1)).fetchDeviceStateDataUsingSeparateRequests(deviceId);
} }
@Test @Test
@ -341,4 +459,37 @@ public class DefaultDeviceStateServiceTest {
Mockito.verify(telemetrySubscriptionService, Mockito.times(1)).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.eq(isActive), Mockito.any()); Mockito.verify(telemetrySubscriptionService, Mockito.times(1)).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.eq(isActive), Mockito.any());
} }
} @Test
public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() {
var deviceStateData = DeviceStateData.builder().build();
var getOrFetchInvocationCounter = new AtomicInteger();
doAnswer(invocation -> {
getOrFetchInvocationCounter.incrementAndGet();
Thread.sleep(100);
return deviceStateData;
}).when(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
int numberOfThreads = 10;
var allThreadsReadyLatch = new CountDownLatch(numberOfThreads);
var executor = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executor.submit(() -> {
allThreadsReadyLatch.countDown();
try {
allThreadsReadyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
service.getOrFetchDeviceStateData(deviceId);
});
}
executor.shutdown();
await().atMost(10, TimeUnit.SECONDS).until(executor::isTerminated);
assertThat(getOrFetchInvocationCounter.get()).isEqualTo(1);
}
}

View File

@ -480,6 +480,13 @@ message GetOtaPackageResponseMsg {
string fileName = 8; string fileName = 8;
} }
message DeviceConnectProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
}
message DeviceActivityProto { message DeviceActivityProto {
int64 tenantIdMSB = 1; int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2; int64 tenantIdLSB = 2;
@ -488,6 +495,20 @@ message DeviceActivityProto {
int64 lastActivityTime = 5; int64 lastActivityTime = 5;
} }
message DeviceDisconnectProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
}
message DeviceInactivityProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto { message SubscriptionInfoProto {
int64 lastActivityTime = 1; int64 lastActivityTime = 1;
@ -977,6 +998,9 @@ message ToCoreMsg {
NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7; NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7;
LifecycleEventProto lifecycleEventMsg = 8; LifecycleEventProto lifecycleEventMsg = 8;
ErrorEventProto errorEventMsg = 9; ErrorEventProto errorEventMsg = 9;
DeviceConnectProto deviceConnectMsg = 10;
DeviceDisconnectProto deviceDisconnectMsg = 11;
DeviceInactivityProto deviceInactivityMsg = 12;
} }
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */ /* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -0,0 +1,178 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import java.util.Map;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "device state",
nodeDescription = "Triggers device connectivity events",
nodeDetails = "If incoming message originator is a device," +
" registers configured event for that device in the Device State Service," +
" which sends appropriate message to the Rule Engine. " +
"Incoming message is forwarded using the <code>Success</code> chain," +
" unless an unexpected error occurs during message processing" +
" then incoming message is forwarded using the <code>Failure</code> chain." +
"<br>" +
"Supported device connectivity events are:" +
"<ul>" +
"<li>Connect event</li>" +
"<li>Disconnect event</li>" +
"<li>Activity event</li>" +
"<li>Inactivity event</li>" +
"</ul>" +
"This node is particularly useful when device isn't using transports to receive data," +
" such as when fetching data from external API or computing new data within the rule chain.",
configClazz = TbDeviceStateNodeConfiguration.class,
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeDeviceStateConfig"
)
public class TbDeviceStateNode implements TbNode {
private static final TbQueueCallback EMPTY_CALLBACK = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final Map<TbMsgType, ConnectivityEvent> SUPPORTED_EVENTS = Map.of(
TbMsgType.CONNECT_EVENT, this::sendDeviceConnectMsg,
TbMsgType.ACTIVITY_EVENT, this::sendDeviceActivityMsg,
TbMsgType.DISCONNECT_EVENT, this::sendDeviceDisconnectMsg,
TbMsgType.INACTIVITY_EVENT, this::sendDeviceInactivityMsg
);
private interface ConnectivityEvent {
void sendEvent(TbContext ctx, TbMsg msg);
}
private TbMsgType event;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
TbMsgType event = TbNodeUtils.convert(configuration, TbDeviceStateNodeConfiguration.class).getEvent();
if (event == null) {
throw new TbNodeException("Event cannot be null!", true);
}
if (!SUPPORTED_EVENTS.containsKey(event)) {
throw new TbNodeException("Unsupported event: " + event, true);
}
this.event = event;
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var originator = msg.getOriginator();
if (!ctx.isLocalEntity(originator)) {
log.warn("[{}][device-state-node] Received message from non-local entity [{}]!", ctx.getSelfId(), originator);
return;
}
if (!EntityType.DEVICE.equals(originator.getEntityType())) {
ctx.tellSuccess(msg);
return;
}
SUPPORTED_EVENTS.get(event).sendEvent(ctx, msg);
ctx.tellSuccess(msg);
}
private void sendDeviceConnectMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
}
private void sendDeviceActivityMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastActivityTime(System.currentTimeMillis())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
}
private void sendDeviceDisconnectMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
}
private void sendDeviceInactivityMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.msg.TbMsgType;
@Data
public class TbDeviceStateNodeConfiguration implements NodeConfiguration<TbDeviceStateNodeConfiguration> {
private TbMsgType event;
@Override
public TbDeviceStateNodeConfiguration defaultConfiguration() {
var config = new TbDeviceStateNodeConfiguration();
config.setEvent(TbMsgType.ACTIVITY_EVENT);
return config;
}
}

View File

@ -0,0 +1,313 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
@ExtendWith(MockitoExtension.class)
public class TbDeviceStateNodeTest {
private static TenantId DUMMY_TENANT_ID;
private static TbMsg DUMMY_MSG;
private static DeviceId DUMMY_MSG_ORIGINATOR;
@Mock
private TbContext ctxMock;
@Mock
private TbClusterService tbClusterServiceMock;
private TbDeviceStateNode node;
private TbDeviceStateNodeConfiguration config;
@BeforeAll
public static void init() {
DUMMY_TENANT_ID = TenantId.fromUUID(UUID.randomUUID());
var device = new Device();
device.setTenantId(DUMMY_TENANT_ID);
device.setId(new DeviceId(UUID.randomUUID()));
device.setName("My humidity sensor");
device.setType("Humidity sensor");
device.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID()));
var metaData = new TbMsgMetaData();
metaData.putValue("deviceName", device.getName());
metaData.putValue("deviceType", device.getType());
metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
var data = JacksonUtil.newObjectNode();
data.put("humidity", 58.3);
DUMMY_MSG = TbMsg.newMsg(
TbMsgType.POST_TELEMETRY_REQUEST, device.getId(), metaData, JacksonUtil.toString(data)
);
DUMMY_MSG_ORIGINATOR = device.getId();
}
@BeforeEach
public void setUp() {
node = new TbDeviceStateNode();
config = new TbDeviceStateNodeConfiguration();
}
@Test
public void givenDefaultConfiguration_whenInvoked_thenCorrectValuesAreSet() {
// GIVEN-WHEN
config = config.defaultConfiguration();
// THEN
assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT);
}
@Test
public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() {
// GIVEN
config.setEvent(null);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
// WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, nodeConfig))
.isInstanceOf(TbNodeException.class)
.hasMessage("Event cannot be null!")
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() {
// GIVEN
var unsupportedEvent = TbMsgType.TO_SERVER_RPC_REQUEST;
config.setEvent(unsupportedEvent);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
// WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, nodeConfig))
.isInstanceOf(TbNodeException.class)
.hasMessage("Unsupported event: " + unsupportedEvent)
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered() {
// GIVEN
var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, new AssetId(UUID.randomUUID()), new TbMsgMetaData(), "{}");
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should(only()).tellSuccess(msg);
}
@Test
public void givenNonLocalOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered() {
// GIVEN
given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(false);
// WHEN
node.onMsg(ctxMock, DUMMY_MSG);
// THEN
then(ctxMock).should(times(1)).isLocalEntity(DUMMY_MSG_ORIGINATOR);
then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG);
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenConnectEventInConfig_whenOnMsg_thenOnDeviceConnectCalledAndTellsSuccess() throws TbNodeException {
// GIVEN
config.setEvent(TbMsgType.CONNECT_EVENT);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID);
given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock);
given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true);
// WHEN
node.onMsg(ctxMock, DUMMY_MSG);
// THEN
var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class);
then(tbClusterServiceMock).should(times(1))
.pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any());
TransportProtos.DeviceConnectProto expectedDeviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits())
.setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits())
.build();
TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(expectedDeviceConnectMsg)
.build();
assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue());
then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG);
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenActivityEventInConfig_whenOnMsg_thenOnDeviceActivityCalledWithCorrectTimeAndTellsSuccess()
throws TbNodeException {
// GIVEN
config.setEvent(TbMsgType.ACTIVITY_EVENT);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID);
given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock);
given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true);
// WHEN
long timeBeforeCall = System.currentTimeMillis();
node.onMsg(ctxMock, DUMMY_MSG);
long timeAfterCall = System.currentTimeMillis();
// THEN
var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class);
then(tbClusterServiceMock).should(times(1))
.pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any());
TransportProtos.ToCoreMsg actualToCoreMsg = protoCaptor.getValue();
long actualLastActivityTime = actualToCoreMsg.getDeviceActivityMsg().getLastActivityTime();
assertThat(actualLastActivityTime).isGreaterThanOrEqualTo(timeBeforeCall);
assertThat(actualLastActivityTime).isLessThanOrEqualTo(timeAfterCall);
TransportProtos.DeviceActivityProto updatedActivityMsg = actualToCoreMsg.getDeviceActivityMsg().toBuilder()
.setLastActivityTime(123L)
.build();
TransportProtos.ToCoreMsg updatedToCoreMsg = actualToCoreMsg.toBuilder()
.setDeviceActivityMsg(updatedActivityMsg)
.build();
TransportProtos.DeviceActivityProto expectedDeviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits())
.setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits())
.setLastActivityTime(123L)
.build();
TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(expectedDeviceActivityMsg)
.build();
assertThat(updatedToCoreMsg).isEqualTo(expectedToCoreMsg);
then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG);
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenInactivityEventInConfig_whenOnMsg_thenOnDeviceInactivityCalledAndTellsSuccess()
throws TbNodeException {
// GIVEN
config.setEvent(TbMsgType.INACTIVITY_EVENT);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID);
given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock);
given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true);
// WHEN
node.onMsg(ctxMock, DUMMY_MSG);
// THEN
var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class);
then(tbClusterServiceMock).should(times(1))
.pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any());
TransportProtos.DeviceInactivityProto expectedDeviceInactivityMsg =
TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits())
.setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits())
.build();
TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(expectedDeviceInactivityMsg)
.build();
assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue());
then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG);
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDisconnectEventInConfig_whenOnMsg_thenOnDeviceDisconnectCalledAndTellsSuccess()
throws TbNodeException {
// GIVEN
config.setEvent(TbMsgType.DISCONNECT_EVENT);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID);
given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock);
given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true);
// WHEN
node.onMsg(ctxMock, DUMMY_MSG);
// THEN
var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class);
then(tbClusterServiceMock).should(times(1))
.pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any());
TransportProtos.DeviceDisconnectProto expectedDeviceDisconnectMsg =
TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits())
.setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits())
.build();
TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(expectedDeviceDisconnectMsg)
.build();
assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue());
then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG);
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
}