Fixed incorrect display of device state (#11536)

* changed logic to save to db activity value from cache

* changed check if partition belongs
This commit is contained in:
Iryna Matveieva 2024-09-05 12:38:52 +03:00 committed by GitHub
parent 5631b78656
commit 6f2801db0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 141 additions and 30 deletions

View File

@ -157,7 +157,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private final DbTypeInfoComponent dbTypeInfoComponent;
private final TbApiUsageReportClient apiUsageReportClient;
private final NotificationRuleProcessor notificationRuleProcessor;
@Autowired @Lazy
@Autowired
@Lazy
private TelemetrySubscriptionService tsSubService;
@Value("${state.defaultInactivityTimeoutInSec}")
@ -362,14 +363,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
if (proto.getAdded()) {
Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable DeviceStateData state) {
public void onSuccess(DeviceStateData state) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
if (addDeviceUsingState(tpi, state)) {
save(deviceId, ACTIVITY_STATE, false);
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
boolean isMyPartition = deviceIds != null;
if (isMyPartition) {
deviceIds.add(state.getDeviceId());
initializeActivityState(deviceId, state);
callback.onSuccess();
} else {
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
, tenantId, deviceId, tpi.getFullTopicName());
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
}
}
@ -400,6 +403,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
cleanupEntity(deviceId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
if (deviceIdSet != null) {
deviceIdSet.remove(deviceId);
}
}
private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) {
DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState);
boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive();
save(deviceId, ACTIVITY_STATE, activityState);
}
@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
@ -436,10 +454,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
for (var state : states) {
if (!addDeviceUsingState(entry.getKey(), state)) {
return;
TopicPartitionInfo tpi = entry.getKey();
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
boolean isMyPartition = deviceIds != null;
if (isMyPartition) {
deviceIds.add(state.getDeviceId());
deviceStates.putIfAbsent(state.getDeviceId(), state);
checkAndUpdateState(state.getDeviceId(), state);
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
}
checkAndUpdateState(state.getDeviceId(), state);
}
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
}
@ -475,18 +499,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
if (deviceIds != null) {
deviceIds.add(state.getDeviceId());
deviceStates.putIfAbsent(state.getDeviceId(), state);
return true;
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
return false;
}
}
void checkStates() {
try {
final long ts = getCurrentTimeMillis();
@ -619,15 +631,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
return cleanup;
}
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
cleanupEntity(deviceId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
if (deviceIdSet != null) {
deviceIdSet.remove(deviceId);
}
}
@Override
protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
cleanupEntity(deviceId);

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.state;
import com.google.common.util.concurrent.Futures;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -28,8 +29,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceIdInfo;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
@ -41,11 +44,13 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
@ -66,6 +71,7 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
@ -1070,4 +1076,106 @@ public class DefaultDeviceStateServiceTest {
then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
@Test
public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
// WHEN
service.onQueueMsg(proto, TbCallback.EMPTY);
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(false), any());
});
}
@Test
public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
.active(false)
.inactivityTimeout(service.getDefaultInactivityTimeoutInSec())
.build();
DeviceStateData stateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.deviceCreationTime(currentTime - 10000)
.state(deviceState)
.metaData(TbMsgMetaData.EMPTY)
.build();
service.deviceStates.put(deviceId, stateData);
// WHEN
service.onDeviceActivity(tenantId, deviceId, currentTime);
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(LAST_ACTIVITY_TIME), eq(currentTime), any());
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
});
}
@Test
public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
.active(true)
.lastConnectTime(currentTime - 8000)
.lastActivityTime(currentTime - 4000)
.lastDisconnectTime(0)
.lastInactivityAlarmTime(0)
.inactivityTimeout(3000)
.build();
DeviceStateData stateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.deviceCreationTime(currentTime - 10000)
.state(deviceState)
.build();
service.deviceStates.put(deviceId, stateData);
// WHEN
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
service.onQueueMsg(proto, TbCallback.EMPTY);
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
});
}
}