diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fd5a252cc5..02850e9d8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -256,14 +256,23 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { @@ -592,17 +601,54 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> stats = new HashMap<>(); for (DeviceStateData stateData : deviceStates.values()) { Pair tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(), @@ -489,10 +500,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(List.of(deviceIdInfo), 0, 1, false)); } + @Test + public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() { + // GIVEN + service.deviceStates.put(deviceId, DeviceStateData.builder().build()); + given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)) + .willReturn(TopicPartitionInfo.builder().build()); + + // WHEN + service.onDeviceInactivity(tenantId, deviceId); + + // THEN + assertThat(service.deviceStates).isEmpty(); + then(service).should(never()).fetchDeviceStateDataUsingSeparateRequests(deviceId); + then(clusterService).shouldHaveNoInteractions(); + then(notificationRuleProcessor).shouldHaveNoInteractions(); + then(telemetrySubscriptionService).shouldHaveNoInteractions(); + } + + @Test + public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() throws InterruptedException { + // GIVEN + initStateService(10000); + var deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(DeviceState.builder().build()) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + + // WHEN + long timeBeforeCall = System.currentTimeMillis(); + service.onDeviceInactivity(tenantId, deviceId); + long timeAfterCall = System.currentTimeMillis(); + + // THEN + var inactivityTimeCaptor = ArgumentCaptor.forClass(Long.class); + then(telemetrySubscriptionService).should().saveAttrAndNotify( + any(), eq(deviceId), any(), eq(INACTIVITY_ALARM_TIME), inactivityTimeCaptor.capture(), any() + ); + var actualInactivityTime = inactivityTimeCaptor.getValue(); + assertThat(actualInactivityTime).isGreaterThanOrEqualTo(timeBeforeCall); + assertThat(actualInactivityTime).isLessThanOrEqualTo(timeAfterCall); + + then(telemetrySubscriptionService).should().saveAttrAndNotify( + any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any() + ); + + var msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any()); + var actualMsg = msgCaptor.getValue(); + assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name()); + assertThat(actualMsg.getOriginator()).isEqualTo(deviceId); + + var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class); + then(notificationRuleProcessor).should().process(notificationCaptor.capture()); + var actualNotification = notificationCaptor.getValue(); + assertThat(actualNotification.getTenantId()).isEqualTo(tenantId); + assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId); + assertThat(actualNotification.isActive()).isFalse(); + } + + @Test + public void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() { + // GIVEN + var deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(DeviceState.builder().build()) + .metaData(new TbMsgMetaData()) + .build(); + + given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi); + + // WHEN + service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData); + + // THEN + then(telemetrySubscriptionService).should().saveAttrAndNotify( + any(), eq(deviceId), any(), eq(INACTIVITY_ALARM_TIME), anyLong(), any() + ); + then(telemetrySubscriptionService).should().saveAttrAndNotify( + any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any() + ); + + var msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any()); + var actualMsg = msgCaptor.getValue(); + assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name()); + assertThat(actualMsg.getOriginator()).isEqualTo(deviceId); + + var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class); + then(notificationRuleProcessor).should().process(notificationCaptor.capture()); + var actualNotification = notificationCaptor.getValue(); + assertThat(actualNotification.getTenantId()).isEqualTo(tenantId); + assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId); + assertThat(actualNotification.isActive()).isFalse(); + } + @Test public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() { service.deviceStates.put(deviceId, deviceStateDataMock); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); - assertThat(deviceStateData, is(deviceStateDataMock)); - Mockito.verify(service, never()).fetchDeviceStateDataUsingEntityDataQuery(deviceId); + assertThat(deviceStateData).isEqualTo(deviceStateDataMock); + Mockito.verify(service, never()).fetchDeviceStateDataUsingSeparateRequests(deviceId); } @Test public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() { service.deviceStates.clear(); - willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingEntityDataQuery(deviceId); + willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); - assertThat(deviceStateData, is(deviceStateDataMock)); - Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId); + assertThat(deviceStateData).isEqualTo(deviceStateDataMock); + Mockito.verify(service, times(1)).fetchDeviceStateDataUsingSeparateRequests(deviceId); } @Test @@ -341,4 +459,37 @@ public class DefaultDeviceStateServiceTest { Mockito.verify(telemetrySubscriptionService, Mockito.times(1)).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.eq(isActive), Mockito.any()); } -} \ No newline at end of file + @Test + public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() { + var deviceStateData = DeviceStateData.builder().build(); + var getOrFetchInvocationCounter = new AtomicInteger(); + + doAnswer(invocation -> { + getOrFetchInvocationCounter.incrementAndGet(); + Thread.sleep(100); + return deviceStateData; + }).when(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); + + int numberOfThreads = 10; + var allThreadsReadyLatch = new CountDownLatch(numberOfThreads); + var executor = Executors.newFixedThreadPool(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + executor.submit(() -> { + allThreadsReadyLatch.countDown(); + try { + allThreadsReadyLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + service.getOrFetchDeviceStateData(deviceId); + }); + } + + executor.shutdown(); + await().atMost(10, TimeUnit.SECONDS).until(executor::isTerminated); + + assertThat(getOrFetchInvocationCounter.get()).isEqualTo(1); + } + +} diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 60dfcae1de..5260a4fa26 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -480,6 +480,13 @@ message GetOtaPackageResponseMsg { string fileName = 8; } +message DeviceConnectProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; +} + message DeviceActivityProto { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; @@ -488,6 +495,20 @@ message DeviceActivityProto { int64 lastActivityTime = 5; } +message DeviceDisconnectProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; +} + +message DeviceInactivityProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -977,6 +998,9 @@ message ToCoreMsg { NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7; LifecycleEventProto lifecycleEventMsg = 8; ErrorEventProto errorEventMsg = 9; + DeviceConnectProto deviceConnectMsg = 10; + DeviceDisconnectProto deviceDisconnectMsg = 11; + DeviceInactivityProto deviceInactivityMsg = 12; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java new file mode 100644 index 0000000000..7003f78a00 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java @@ -0,0 +1,178 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; + +import java.util.Map; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "device state", + nodeDescription = "Triggers device connectivity events", + nodeDetails = "If incoming message originator is a device," + + " registers configured event for that device in the Device State Service," + + " which sends appropriate message to the Rule Engine. " + + "Incoming message is forwarded using the Success chain," + + " unless an unexpected error occurs during message processing" + + " then incoming message is forwarded using the Failure chain." + + "
" + + "Supported device connectivity events are:" + + "
    " + + "
  • Connect event
  • " + + "
  • Disconnect event
  • " + + "
  • Activity event
  • " + + "
  • Inactivity event
  • " + + "
" + + "This node is particularly useful when device isn't using transports to receive data," + + " such as when fetching data from external API or computing new data within the rule chain.", + configClazz = TbDeviceStateNodeConfiguration.class, + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbActionNodeDeviceStateConfig" +) +public class TbDeviceStateNode implements TbNode { + + private static final TbQueueCallback EMPTY_CALLBACK = new TbQueueCallback() { + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + } + + @Override + public void onFailure(Throwable t) { + } + + }; + + private final Map SUPPORTED_EVENTS = Map.of( + TbMsgType.CONNECT_EVENT, this::sendDeviceConnectMsg, + TbMsgType.ACTIVITY_EVENT, this::sendDeviceActivityMsg, + TbMsgType.DISCONNECT_EVENT, this::sendDeviceDisconnectMsg, + TbMsgType.INACTIVITY_EVENT, this::sendDeviceInactivityMsg + ); + + private interface ConnectivityEvent { + + void sendEvent(TbContext ctx, TbMsg msg); + + } + + private TbMsgType event; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + TbMsgType event = TbNodeUtils.convert(configuration, TbDeviceStateNodeConfiguration.class).getEvent(); + if (event == null) { + throw new TbNodeException("Event cannot be null!", true); + } + if (!SUPPORTED_EVENTS.containsKey(event)) { + throw new TbNodeException("Unsupported event: " + event, true); + } + this.event = event; + + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + var originator = msg.getOriginator(); + if (!ctx.isLocalEntity(originator)) { + log.warn("[{}][device-state-node] Received message from non-local entity [{}]!", ctx.getSelfId(), originator); + return; + } + if (!EntityType.DEVICE.equals(originator.getEntityType())) { + ctx.tellSuccess(msg); + return; + } + SUPPORTED_EVENTS.get(event).sendEvent(ctx, msg); + ctx.tellSuccess(msg); + } + + private void sendDeviceConnectMsg(TbContext ctx, TbMsg msg) { + var tenantUuid = ctx.getTenantId().getId(); + var deviceUuid = msg.getOriginator().getId(); + var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() + .setTenantIdMSB(tenantUuid.getMostSignificantBits()) + .setTenantIdLSB(tenantUuid.getLeastSignificantBits()) + .setDeviceIdMSB(deviceUuid.getMostSignificantBits()) + .setDeviceIdLSB(deviceUuid.getLeastSignificantBits()) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceConnectMsg(deviceConnectMsg) + .build(); + ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + } + + private void sendDeviceActivityMsg(TbContext ctx, TbMsg msg) { + var tenantUuid = ctx.getTenantId().getId(); + var deviceUuid = msg.getOriginator().getId(); + var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(tenantUuid.getMostSignificantBits()) + .setTenantIdLSB(tenantUuid.getLeastSignificantBits()) + .setDeviceIdMSB(deviceUuid.getMostSignificantBits()) + .setDeviceIdLSB(deviceUuid.getLeastSignificantBits()) + .setLastActivityTime(System.currentTimeMillis()) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceActivityMsg(deviceActivityMsg) + .build(); + ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + } + + private void sendDeviceDisconnectMsg(TbContext ctx, TbMsg msg) { + var tenantUuid = ctx.getTenantId().getId(); + var deviceUuid = msg.getOriginator().getId(); + var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() + .setTenantIdMSB(tenantUuid.getMostSignificantBits()) + .setTenantIdLSB(tenantUuid.getLeastSignificantBits()) + .setDeviceIdMSB(deviceUuid.getMostSignificantBits()) + .setDeviceIdLSB(deviceUuid.getLeastSignificantBits()) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceDisconnectMsg(deviceDisconnectMsg) + .build(); + ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + } + + private void sendDeviceInactivityMsg(TbContext ctx, TbMsg msg) { + var tenantUuid = ctx.getTenantId().getId(); + var deviceUuid = msg.getOriginator().getId(); + var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() + .setTenantIdMSB(tenantUuid.getMostSignificantBits()) + .setTenantIdLSB(tenantUuid.getLeastSignificantBits()) + .setDeviceIdMSB(deviceUuid.getMostSignificantBits()) + .setDeviceIdLSB(deviceUuid.getLeastSignificantBits()) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityMsg(deviceInactivityMsg) + .build(); + ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeConfiguration.java new file mode 100644 index 0000000000..c89fcb076f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeConfiguration.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.msg.TbMsgType; + +@Data +public class TbDeviceStateNodeConfiguration implements NodeConfiguration { + + private TbMsgType event; + + @Override + public TbDeviceStateNodeConfiguration defaultConfiguration() { + var config = new TbDeviceStateNodeConfiguration(); + config.setEvent(TbMsgType.ACTIVITY_EVENT); + return config; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java new file mode 100644 index 0000000000..1aaabce61d --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java @@ -0,0 +1,313 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.times; + +@ExtendWith(MockitoExtension.class) +public class TbDeviceStateNodeTest { + + private static TenantId DUMMY_TENANT_ID; + private static TbMsg DUMMY_MSG; + private static DeviceId DUMMY_MSG_ORIGINATOR; + @Mock + private TbContext ctxMock; + @Mock + private TbClusterService tbClusterServiceMock; + private TbDeviceStateNode node; + private TbDeviceStateNodeConfiguration config; + + @BeforeAll + public static void init() { + DUMMY_TENANT_ID = TenantId.fromUUID(UUID.randomUUID()); + + var device = new Device(); + device.setTenantId(DUMMY_TENANT_ID); + device.setId(new DeviceId(UUID.randomUUID())); + device.setName("My humidity sensor"); + device.setType("Humidity sensor"); + device.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); + var metaData = new TbMsgMetaData(); + metaData.putValue("deviceName", device.getName()); + metaData.putValue("deviceType", device.getType()); + metaData.putValue("ts", String.valueOf(System.currentTimeMillis())); + var data = JacksonUtil.newObjectNode(); + data.put("humidity", 58.3); + DUMMY_MSG = TbMsg.newMsg( + TbMsgType.POST_TELEMETRY_REQUEST, device.getId(), metaData, JacksonUtil.toString(data) + ); + DUMMY_MSG_ORIGINATOR = device.getId(); + } + + @BeforeEach + public void setUp() { + node = new TbDeviceStateNode(); + config = new TbDeviceStateNodeConfiguration(); + } + + @Test + public void givenDefaultConfiguration_whenInvoked_thenCorrectValuesAreSet() { + // GIVEN-WHEN + config = config.defaultConfiguration(); + + // THEN + assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT); + } + + @Test + public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() { + // GIVEN + config.setEvent(null); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, nodeConfig)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Event cannot be null!") + .matches(e -> ((TbNodeException) e).isUnrecoverable()); + } + + @Test + public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() { + // GIVEN + var unsupportedEvent = TbMsgType.TO_SERVER_RPC_REQUEST; + config.setEvent(unsupportedEvent); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, nodeConfig)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Unsupported event: " + unsupportedEvent) + .matches(e -> ((TbNodeException) e).isUnrecoverable()); + } + + @Test + public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered() { + // GIVEN + var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, new AssetId(UUID.randomUUID()), new TbMsgMetaData(), "{}"); + + // WHEN + node.onMsg(ctxMock, msg); + + // THEN + then(ctxMock).should(only()).tellSuccess(msg); + } + + @Test + public void givenNonLocalOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered() { + // GIVEN + given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(false); + + // WHEN + node.onMsg(ctxMock, DUMMY_MSG); + + // THEN + then(ctxMock).should(times(1)).isLocalEntity(DUMMY_MSG_ORIGINATOR); + then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenConnectEventInConfig_whenOnMsg_thenOnDeviceConnectCalledAndTellsSuccess() throws TbNodeException { + // GIVEN + config.setEvent(TbMsgType.CONNECT_EVENT); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, nodeConfig); + given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID); + given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock); + given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true); + + // WHEN + node.onMsg(ctxMock, DUMMY_MSG); + + // THEN + var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class); + then(tbClusterServiceMock).should(times(1)) + .pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any()); + + TransportProtos.DeviceConnectProto expectedDeviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() + .setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits()) + .setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits()) + .build(); + TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceConnectMsg(expectedDeviceConnectMsg) + .build(); + assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue()); + + then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenActivityEventInConfig_whenOnMsg_thenOnDeviceActivityCalledWithCorrectTimeAndTellsSuccess() + throws TbNodeException { + // GIVEN + config.setEvent(TbMsgType.ACTIVITY_EVENT); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, nodeConfig); + given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID); + given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock); + given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true); + + + // WHEN + long timeBeforeCall = System.currentTimeMillis(); + node.onMsg(ctxMock, DUMMY_MSG); + long timeAfterCall = System.currentTimeMillis(); + + // THEN + var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class); + then(tbClusterServiceMock).should(times(1)) + .pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any()); + + TransportProtos.ToCoreMsg actualToCoreMsg = protoCaptor.getValue(); + long actualLastActivityTime = actualToCoreMsg.getDeviceActivityMsg().getLastActivityTime(); + + assertThat(actualLastActivityTime).isGreaterThanOrEqualTo(timeBeforeCall); + assertThat(actualLastActivityTime).isLessThanOrEqualTo(timeAfterCall); + + TransportProtos.DeviceActivityProto updatedActivityMsg = actualToCoreMsg.getDeviceActivityMsg().toBuilder() + .setLastActivityTime(123L) + .build(); + TransportProtos.ToCoreMsg updatedToCoreMsg = actualToCoreMsg.toBuilder() + .setDeviceActivityMsg(updatedActivityMsg) + .build(); + + TransportProtos.DeviceActivityProto expectedDeviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits()) + .setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits()) + .setLastActivityTime(123L) + .build(); + TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceActivityMsg(expectedDeviceActivityMsg) + .build(); + + assertThat(updatedToCoreMsg).isEqualTo(expectedToCoreMsg); + + then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenInactivityEventInConfig_whenOnMsg_thenOnDeviceInactivityCalledAndTellsSuccess() + throws TbNodeException { + // GIVEN + config.setEvent(TbMsgType.INACTIVITY_EVENT); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, nodeConfig); + given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID); + given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock); + given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true); + + // WHEN + node.onMsg(ctxMock, DUMMY_MSG); + + // THEN + var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class); + then(tbClusterServiceMock).should(times(1)) + .pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any()); + + TransportProtos.DeviceInactivityProto expectedDeviceInactivityMsg = + TransportProtos.DeviceInactivityProto.newBuilder() + .setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits()) + .setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits()) + .build(); + TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityMsg(expectedDeviceInactivityMsg) + .build(); + assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue()); + + then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDisconnectEventInConfig_whenOnMsg_thenOnDeviceDisconnectCalledAndTellsSuccess() + throws TbNodeException { + // GIVEN + config.setEvent(TbMsgType.DISCONNECT_EVENT); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, nodeConfig); + given(ctxMock.getTenantId()).willReturn(DUMMY_TENANT_ID); + given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock); + given(ctxMock.isLocalEntity(DUMMY_MSG.getOriginator())).willReturn(true); + + // WHEN + node.onMsg(ctxMock, DUMMY_MSG); + + // THEN + var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class); + then(tbClusterServiceMock).should(times(1)) + .pushMsgToCore(eq(DUMMY_TENANT_ID), eq(DUMMY_MSG_ORIGINATOR), protoCaptor.capture(), any()); + + TransportProtos.DeviceDisconnectProto expectedDeviceDisconnectMsg = + TransportProtos.DeviceDisconnectProto.newBuilder() + .setTenantIdMSB(DUMMY_TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(DUMMY_TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DUMMY_MSG_ORIGINATOR.getId().getMostSignificantBits()) + .setDeviceIdLSB(DUMMY_MSG_ORIGINATOR.getId().getLeastSignificantBits()) + .build(); + TransportProtos.ToCoreMsg expectedToCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceDisconnectMsg(expectedDeviceDisconnectMsg) + .build(); + assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue()); + + then(ctxMock).should(times(1)).tellSuccess(DUMMY_MSG); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + +}