diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 1c53920a93..6025e874b9 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -157,7 +157,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService() { @Override - public void onSuccess(@Nullable DeviceStateData state) { + public void onSuccess(DeviceStateData state) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId()); - if (addDeviceUsingState(tpi, state)) { - save(deviceId, ACTIVITY_STATE, false); + Set deviceIds = partitionedEntities.get(tpi); + boolean isMyPartition = deviceIds != null; + if (isMyPartition) { + deviceIds.add(state.getDeviceId()); + initializeActivityState(deviceId, state); callback.onSuccess(); } else { - log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}" - , tenantId, deviceId, tpi.getFullTopicName()); + log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName()); callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!")); } } @@ -400,6 +403,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceIdSet = partitionedEntities.get(tpi); + if (deviceIdSet != null) { + deviceIdSet.remove(deviceId); + } + } + + private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) { + DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState); + boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive(); + save(deviceId, ACTIVITY_STATE, activityState); + } + @Override protected Map>> onAddedPartitions(Set addedPartitions) { var result = new HashMap>>(); @@ -436,10 +454,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceIds = partitionedEntities.get(tpi); + boolean isMyPartition = deviceIds != null; + if (isMyPartition) { + deviceIds.add(state.getDeviceId()); + deviceStates.putIfAbsent(state.getDeviceId(), state); + checkAndUpdateState(state.getDeviceId(), state); + } else { + log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName()); } - checkAndUpdateState(state.getDeviceId(), state); } log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size()); } @@ -475,18 +499,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceIds = partitionedEntities.get(tpi); - if (deviceIds != null) { - deviceIds.add(state.getDeviceId()); - deviceStates.putIfAbsent(state.getDeviceId(), state); - return true; - } else { - log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName()); - return false; - } - } - void checkStates() { try { final long ts = getCurrentTimeMillis(); @@ -619,15 +631,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceIdSet = partitionedEntities.get(tpi); - if (deviceIdSet != null) { - deviceIdSet.remove(deviceId); - } - } - @Override protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) { cleanupEntity(deviceId); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 9833396a0a..296191d61b 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.state; +import com.google.common.util.concurrent.Futures; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,8 +29,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; @@ -41,11 +44,13 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.sql.query.EntityQueryRepository; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; @@ -66,6 +71,7 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -1070,4 +1076,106 @@ public class DefaultDeviceStateServiceTest { then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId); } + @Test + public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException { + // GIVEN + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId)); + given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setAdded(true) + .setUpdated(false) + .setDeleted(false) + .build(); + + // WHEN + service.onQueueMsg(proto, TbCallback.EMPTY); + + // THEN + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false); + then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(false), any()); + }); + } + + @Test + public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException { + // GIVEN + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + long currentTime = System.currentTimeMillis(); + DeviceState deviceState = DeviceState.builder() + .active(false) + .inactivityTimeout(service.getDefaultInactivityTimeoutInSec()) + .build(); + DeviceStateData stateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .deviceCreationTime(currentTime - 10000) + .state(deviceState) + .metaData(TbMsgMetaData.EMPTY) + .build(); + service.deviceStates.put(deviceId, stateData); + + // WHEN + service.onDeviceActivity(tenantId, deviceId, currentTime); + + // THEN + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); + then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(LAST_ACTIVITY_TIME), eq(currentTime), any()); + then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any()); + }); + } + + @Test + public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException { + // GIVEN + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId)); + given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + long currentTime = System.currentTimeMillis(); + DeviceState deviceState = DeviceState.builder() + .active(true) + .lastConnectTime(currentTime - 8000) + .lastActivityTime(currentTime - 4000) + .lastDisconnectTime(0) + .lastInactivityAlarmTime(0) + .inactivityTimeout(3000) + .build(); + DeviceStateData stateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .deviceCreationTime(currentTime - 10000) + .state(deviceState) + .build(); + service.deviceStates.put(deviceId, stateData); + + // WHEN + TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setAdded(true) + .setUpdated(false) + .setDeleted(false) + .build(); + service.onQueueMsg(proto, TbCallback.EMPTY); + + // THEN + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); + then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any()); + }); + } + }