diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index eb86220361..f9ec8087ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; @@ -35,7 +36,7 @@ public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQu */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback callback); void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8289e4db42..c3185738f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -96,7 +97,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } @Override - public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { + public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), @@ -186,17 +187,18 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); + List entries = request.getEntries(); + List versions = result.versions(); + for (int i = 0; i < entries.size(); i++) { AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); - if (versions != null) { - attrProtoBuilder.setVersion(versions.get(i)); - } + attrProtoBuilder.setVersion(versions.get(i)); telemetryMsg.addAttrData(attrProtoBuilder.build()); } msg.setTelemetryMsg(telemetryMsg.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java index 173ba742af..0778d61ee7 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -62,8 +63,6 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; @@ -240,10 +239,11 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); } - private ListenableFuture> saveProvisionStateAttribute(Device device) { - return attributesService.save(device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE, - Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), - System.currentTimeMillis()))); + private ListenableFuture saveProvisionStateAttribute(Device device) { + return attributesService.save( + device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE, + new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), System.currentTimeMillis()) + ); } private DeviceCredentials getDeviceCredentials(Device device) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index e4b78a5ad8..b6ecd848ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -581,10 +582,10 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onSuccess(@Nullable Pair newStartTsAndSeqId) { if (newStartTsAndSeqId != null) { - ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); + ListenableFuture updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); Futures.addCallback(updateFuture, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List list) { + public void onSuccess(@Nullable AttributesSaveResult saveResult) { log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId); boolean newEventsAvailable; if (fetcher.isSeqIdNewCycleStarted()) { @@ -645,8 +646,7 @@ public abstract class EdgeGrpcSession implements Closeable { log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg); } } - case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> - downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); + case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction()); } } catch (Exception e) { @@ -722,13 +722,14 @@ public abstract class EdgeGrpcSession implements Closeable { return startSeqId; } - private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) { + private ListenableFuture updateQueueStartTsAndSeqId(Pair pair) { newStartTs = pair.getFirst(); newStartSeqId = pair.getSecond(); log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", sessionId, edge.getId(), newStartTs, newStartSeqId); - List attributes = Arrays.asList( + List attributes = List.of( new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()), - new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis())); + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis()) + ); return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index e9ef8c5ace..d580175aa0 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; @@ -581,9 +582,9 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); } else { - ListenableFuture> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, - Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) - , System.currentTimeMillis()))); + ListenableFuture saveFuture = attributesService.save( + TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(new BooleanDataEntry(key, value), System.currentTimeMillis()) + ); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); } } @@ -611,7 +612,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { } private void addTsCallback(ListenableFuture saveFuture, final FutureCallback callback) { - Futures.addCallback(saveFuture, new FutureCallback() { + Futures.addCallback(saveFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable S result) { callback.onSuccess(result); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index cc476d377d..23fdc6b8c0 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -27,11 +27,10 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.checkerframework.checker.nullness.qual.NonNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; @@ -170,35 +169,22 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService> stats = new HashMap<>(); for (DeviceStateData stateData : deviceStates.values()) { @@ -574,7 +559,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService() { + @Override + public void onSuccess(Void success) { + stateData.getState().setLastInactivityAlarmTime(ts); + onDeviceActivityStatusChange(false, stateData); + } + + @Override + public void onFailure(@NonNull Throwable t) { + log.error("[{}][{}] Failed to update device last inactivity alarm time to '{}'. Device state data: {}", tenantId, deviceId, ts, stateData, t); + } + }, deviceStateCallbackExecutor); } - boolean isActive(long ts, DeviceState state) { + private static boolean isActive(long ts, DeviceState state) { return ts < state.getLastActivityTime() + state.getInactivityTimeout(); } @@ -616,17 +611,32 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService() { + @Override + public void onSuccess(Void success) { + stateData.getState().setActive(active); + pushRuleEngineMessage(stateData, active ? TbMsgType.ACTIVITY_EVENT : TbMsgType.INACTIVITY_EVENT); + TbMsgMetaData metaData = stateData.getMetaData(); + notificationRuleProcessor.process(DeviceActivityTrigger.builder() + .tenantId(tenantId) + .customerId(stateData.getCustomerId()) + .deviceId(deviceId) + .active(active) + .deviceName(metaData.getValue("deviceName")) + .deviceType(metaData.getValue("deviceType")) + .deviceLabel(metaData.getValue("deviceLabel")) + .build()); + } + + @Override + public void onFailure(@NonNull Throwable t) { + log.error("[{}][{}] Failed to change device activity status to '{}'. Device state data: {}", tenantId, deviceId, active, stateData, t); + } + }, deviceStateCallbackExecutor); } boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) { @@ -634,8 +644,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService save(TenantId tenantId, DeviceId deviceId, String key, long value) { + return save(tenantId, deviceId, new LongDataEntry(key, value), getCurrentTimeMillis()); } - private void save(TenantId tenantId, DeviceId deviceId, String key, boolean value) { - save(tenantId, deviceId, new BooleanDataEntry(key, value), getCurrentTimeMillis()); + private ListenableFuture save(TenantId tenantId, DeviceId deviceId, String key, boolean value) { + return save(tenantId, deviceId, new BooleanDataEntry(key, value), getCurrentTimeMillis()); } - private void save(TenantId tenantId, DeviceId deviceId, KvEntry kvEntry, long ts) { + private ListenableFuture save(TenantId tenantId, DeviceId deviceId, KvEntry kvEntry, long ts) { + ListenableFuture future; if (persistToTelemetry) { - tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() + future = tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) .entry(new BasicTsKvEntry(ts, kvEntry)) @@ -895,7 +908,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, kvEntry)) .build()); } else { - tsSubService.saveAttributes(AttributesSaveRequest.builder() + future = tsSubService.saveAttributesInternal(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) .scope(AttributeScope.SERVER_SCOPE) @@ -903,20 +916,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, kvEntry)) .build()); } + return Futures.transform(future, __ -> null, MoreExecutors.directExecutor()); } long getCurrentTimeMillis() { return System.currentTimeMillis(); } - private static class TelemetrySaveCallback implements FutureCallback { - private final DeviceId deviceId; - private final KvEntry kvEntry; - - TelemetrySaveCallback(DeviceId deviceId, KvEntry kvEntry) { - this.deviceId = deviceId; - this.kvEntry = kvEntry; - } + private record TelemetrySaveCallback(DeviceId deviceId, KvEntry kvEntry) implements FutureCallback { @Override public void onSuccess(@Nullable T result) { @@ -924,9 +931,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService saveAttributesInternal(AttributesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); AttributesSaveRequest.Strategy strategy = request.getStrategy(); - ListenableFuture> resultFuture; + ListenableFuture resultFuture; if (strategy.saveAttributes()) { resultFuture = attrService.save(tenantId, entityId, request.getScope(), request.getEntries()); } else { - resultFuture = Futures.immediateFuture(Collections.emptyList()); + resultFuture = Futures.immediateFuture(AttributesSaveResult.EMPTY); } addMainCallback(resultFuture, result -> { @@ -228,6 +227,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onAttributesUpdate(tenantId, entityId, request.getScope().name(), request.getEntries())); } + return resultFuture; } private static boolean shouldSendSharedAttributesUpdatedNotification(AttributesSaveRequest request) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 8a76aa1d14..2d3be5a0ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; /** @@ -30,7 +31,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request); - void saveAttributesInternal(AttributesSaveRequest request); + ListenableFuture saveAttributesInternal(AttributesSaveRequest request); void deleteTimeseriesInternal(TimeseriesDeleteRequest request); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6d27b9bd7e..09820a81cd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -898,6 +898,8 @@ state: # Used only when state.persistToTelemetry is set to 'true' and Cassandra is used for timeseries data. # 0 means time-to-live mechanism is disabled. telemetryTtl: "${STATE_TELEMETRY_TTL:0}" + # Number of device records to fetch per batch when initializing device activity states + initFetchPackSize: "${TB_DEVICE_STATE_INIT_FETCH_PACK_SIZE:50000}" # Configuration properties for rule nodes related to device activity state rule: node: diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java index 264ac70443..d8bcd42bc8 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.IdBased; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -395,7 +396,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -545,7 +546,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -599,7 +600,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highConsumptions = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>()); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < assets.size(); i++) { Asset asset = assets.get(i); attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE)); @@ -1506,7 +1507,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); for (AttributeScope currentScope : AttributeScope.values()) { @@ -1578,7 +1579,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -1808,7 +1809,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), AttributeScope.CLIENT_SCOPE)); @@ -2269,16 +2270,16 @@ public class EntityServiceTest extends AbstractControllerTest { return filter; } - private ListenableFuture> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) { + private ListenableFuture saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) { KvEntry attrValue = new LongDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); - return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); + return attributesService.save(tenantId, entityId, scope, List.of(attr)); } - private ListenableFuture> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) { + private ListenableFuture saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) { KvEntry attrValue = new StringDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); - return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); + return attributesService.save(tenantId, entityId, scope, List.of(attr)); } private ListenableFuture saveTimeseries(EntityId entityId, String key, Double value) { @@ -2294,8 +2295,8 @@ public class EntityServiceTest extends AbstractControllerTest { } protected void createMultiRootHierarchy(List buildings, List apartments, - Map> entityNameByTypeMap, - Map childParentRelationMap) throws InterruptedException { + Map> entityNameByTypeMap, + Map childParentRelationMap) throws InterruptedException { for (int k = 0; k < 3; k++) { Asset building = new Asset(); building.setTenantId(tenantId); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 26e913eacc..5bf87137ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -16,6 +16,9 @@ package org.thingsboard.server.service.state; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,13 +34,13 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; -import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; @@ -50,20 +53,22 @@ import org.thingsboard.server.dao.sql.query.EntityQueryRepository; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.usagestats.DefaultTbApiUsageReportClient; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import java.time.Duration; import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -77,8 +82,8 @@ import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -90,7 +95,10 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME; @ExtendWith(MockitoExtension.class) -public class DefaultDeviceStateServiceTest { +class DefaultDeviceStateServiceTest { + + ListeningExecutorService deviceStateExecutor; + ListeningExecutorService deviceStateCallbackExecutor; @Mock DeviceService deviceService; @@ -113,25 +121,48 @@ public class DefaultDeviceStateServiceTest { @Mock DefaultTbApiUsageReportClient defaultTbApiUsageReportClient; - TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112")); - DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"); - TopicPartitionInfo tpi; + long defaultInactivityTimeoutMs = Duration.ofMinutes(10L).toMillis(); + + TenantId tenantId = TenantId.fromUUID(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112")); + DeviceId deviceId = DeviceId.fromString("c209f718-42e5-11f0-9fe2-0242ac120002"); + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic("tb_core") + .partition(0) + .myPartition(true) + .build(); DefaultDeviceStateService service; @BeforeEach - public void setUp() { + void setUp() { service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, defaultTbApiUsageReportClient, notificationRuleProcessor)); ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService); + ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", defaultInactivityTimeoutMs); ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60); ReflectionTestUtils.setField(service, "defaultActivityStatsIntervalInSec", 60); - ReflectionTestUtils.setField(service, "initFetchPackSize", 10); + ReflectionTestUtils.setField(service, "initFetchPackSize", 50000); - tpi = TopicPartitionInfo.builder().myPartition(true).build(); + deviceStateExecutor = MoreExecutors.newDirectExecutorService(); + ReflectionTestUtils.setField(service, "deviceStateExecutor", deviceStateExecutor); + + deviceStateCallbackExecutor = MoreExecutors.newDirectExecutorService(); + ReflectionTestUtils.setField(service, "deviceStateCallbackExecutor", deviceStateCallbackExecutor); + + lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi); + + ConcurrentMap> partitionedEntities = new ConcurrentHashMap<>(); + partitionedEntities.put(tpi, new HashSet<>()); + ReflectionTestUtils.setField(service, "partitionedEntities", partitionedEntities); + } + + @AfterEach + void cleanup() { + deviceStateExecutor.shutdownNow(); + deviceStateCallbackExecutor.shutdownNow(); } @Test - public void givenDeviceBelongsToExternalPartition_whenOnDeviceConnect_thenCleansStateAndDoesNotReportConnect() { + void givenDeviceBelongsToExternalPartition_whenOnDeviceConnect_thenCleansStateAndDoesNotReportConnect() { // GIVEN doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -149,7 +180,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @ValueSource(longs = {Long.MIN_VALUE, -100, -1}) - public void givenNegativeLastConnectTime_whenOnDeviceConnect_thenSkipsThisEvent(long negativeLastConnectTime) { + void givenNegativeLastConnectTime_whenOnDeviceConnect_thenSkipsThisEvent(long negativeLastConnectTime) { // GIVEN doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -166,7 +197,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideOutdatedTimestamps") - public void givenOutdatedLastConnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastConnectTime, long currentLastConnectTime) { + void givenOutdatedLastConnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastConnectTime, long currentLastConnectTime) { // GIVEN doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -188,7 +219,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceBelongsToMyPartition_whenOnDeviceConnect_thenReportsConnect() { + void givenDeviceBelongsToMyPartition_whenOnDeviceConnect_thenReportsConnect() { // GIVEN var deviceStateData = DeviceStateData.builder() .tenantId(tenantId) @@ -202,11 +233,13 @@ public class DefaultDeviceStateServiceTest { service.deviceStates.put(deviceId, deviceStateData); long lastConnectTime = System.currentTimeMillis(); + mockSuccessfulSaveAttributes(); + // WHEN service.onDeviceConnect(tenantId, deviceId, lastConnectTime); // THEN - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(LAST_CONNECT_TIME) && @@ -221,7 +254,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceBelongsToExternalPartition_whenOnDeviceDisconnect_thenCleansStateAndDoesNotReportDisconnect() { + void givenDeviceBelongsToExternalPartition_whenOnDeviceDisconnect_thenCleansStateAndDoesNotReportDisconnect() { // GIVEN doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -238,7 +271,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @ValueSource(longs = {Long.MIN_VALUE, -100, -1}) - public void givenNegativeLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long negativeLastDisconnectTime) { + void givenNegativeLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long negativeLastDisconnectTime) { // GIVEN doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -254,7 +287,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideOutdatedTimestamps") - public void givenOutdatedLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastDisconnectTime, long currentLastDisconnectTime) { + void givenOutdatedLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastDisconnectTime, long currentLastDisconnectTime) { // GIVEN doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -275,7 +308,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceBelongsToMyPartition_whenOnDeviceDisconnect_thenReportsDisconnect() { + void givenDeviceBelongsToMyPartition_whenOnDeviceDisconnect_thenReportsDisconnect() { // GIVEN var deviceStateData = DeviceStateData.builder() .tenantId(tenantId) @@ -289,11 +322,13 @@ public class DefaultDeviceStateServiceTest { service.deviceStates.put(deviceId, deviceStateData); long lastDisconnectTime = System.currentTimeMillis(); + mockSuccessfulSaveAttributes(); + // WHEN service.onDeviceDisconnect(tenantId, deviceId, lastDisconnectTime); // THEN - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(LAST_DISCONNECT_TIME) && @@ -308,7 +343,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() { + void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() { // GIVEN doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -325,7 +360,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @ValueSource(longs = {Long.MIN_VALUE, -100, -1}) - public void givenNegativeLastInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(long negativeLastInactivityTime) { + void givenNegativeLastInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(long negativeLastInactivityTime) { // GIVEN doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId); @@ -341,7 +376,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideOutdatedTimestamps") - public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent( + void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent( long outdatedLastInactivityTime, long currentLastInactivityTime ) { // GIVEN @@ -365,7 +400,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideOutdatedTimestamps") - public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentActivityTime_whenOnDeviceInactivity_thenSkipsThisEvent( + void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentActivityTime_whenOnDeviceInactivity_thenSkipsThisEvent( long outdatedLastInactivityTime, long currentLastActivityTime ) { // GIVEN @@ -398,7 +433,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() { + void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() { // GIVEN var deviceStateData = DeviceStateData.builder() .tenantId(tenantId) @@ -412,17 +447,19 @@ public class DefaultDeviceStateServiceTest { service.deviceStates.put(deviceId, deviceStateData); long lastInactivityTime = System.currentTimeMillis(); + mockSuccessfulSaveAttributes(); + // WHEN service.onDeviceInactivity(tenantId, deviceId, lastInactivityTime); // THEN - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && request.getEntries().get(0).getValue().equals(lastInactivityTime) )); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && @@ -445,7 +482,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() { + void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() { // GIVEN var deviceStateData = DeviceStateData.builder() .tenantId(tenantId) @@ -456,16 +493,18 @@ public class DefaultDeviceStateServiceTest { given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi); + mockSuccessfulSaveAttributes(); + // WHEN service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData); // THEN - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) )); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && @@ -488,7 +527,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() { + void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() { service.deviceStates.put(deviceId, deviceStateDataMock); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); assertThat(deviceStateData).isEqualTo(deviceStateDataMock); @@ -496,7 +535,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() { + void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() { service.deviceStates.clear(); willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); @@ -504,29 +543,18 @@ public class DefaultDeviceStateServiceTest { verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); } - private void initStateService(long timeout) throws InterruptedException { - service.stop(); - reset(service, telemetrySubscriptionService); - service.setDefaultInactivityTimeoutMs(timeout); - service.init(); - when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi); - when(entityQueryRepository.findEntityDataByQueryInternal(any())).thenReturn(new PageData<>()); - var deviceIdInfo = new DeviceIdInfo(tenantId.getId(), null, deviceId.getId()); - when(deviceService.findDeviceIdInfos(any())) - .thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false)); - PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of( - new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi) - ), Collections.emptyMap()); - service.onApplicationEvent(event); - Thread.sleep(100); - } + @MethodSource + @ParameterizedTest + void testOnDeviceInactivityTimeoutUpdate(boolean initialActivityStatus, long newInactivityTimeout, boolean expectedActivityStatus) { + // GIVEN + doReturn(200L).when(service).getCurrentTimeMillis(); - @Test - public void increaseInactivityForInactiveDeviceTest() throws Exception { - final long defaultTimeout = 1; - initStateService(defaultTimeout); - DeviceState deviceState = DeviceState.builder().build(); - DeviceStateData deviceStateData = DeviceStateData.builder() + var deviceState = DeviceState.builder() + .active(initialActivityStatus) + .lastActivityTime(100L) + .build(); + + var deviceStateData = DeviceStateData.builder() .tenantId(tenantId) .deviceId(deviceId) .state(deviceState) @@ -536,174 +564,44 @@ public class DefaultDeviceStateServiceTest { service.deviceStates.put(deviceId, deviceStateData); service.getPartitionedEntities(tpi).add(deviceId); - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - Thread.sleep(defaultTimeout); - service.checkStates(); - activityVerify(false); + mockSuccessfulSaveAttributes(); - reset(telemetrySubscriptionService); + // WHEN + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout); - long increase = 100; - long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase; + // THEN + long expectedInactivityTimeout = newInactivityTimeout != 0 ? newInactivityTimeout : defaultInactivityTimeoutMs; + assertThat(deviceState.getInactivityTimeout()).isEqualTo(expectedInactivityTimeout); - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - activityVerify(true); - Thread.sleep(increase); - service.checkStates(); - activityVerify(false); + assertThat(deviceState.isActive()).isEqualTo(expectedActivityStatus); + if (initialActivityStatus != expectedActivityStatus) { + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> { + AttributeKvEntry entry = request.getEntries().get(0); + return request.getEntityId().equals(deviceId) && entry.getKey().equals(ACTIVITY_STATE) && entry.getValue().equals(expectedActivityStatus); + })); + } + } - reset(telemetrySubscriptionService); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - Thread.sleep(newTimeout + 5); - service.checkStates(); - activityVerify(false); + // to simplify test, these arguments assume that the current time is 200 and the last activity time is 100 + private static Stream testOnDeviceInactivityTimeoutUpdate() { + return Stream.of( + Arguments.of(true, 1L, false), + Arguments.of(true, 50L, false), + Arguments.of(true, 99L, false), + Arguments.of(true, 100L, false), + Arguments.of(true, 101L, true), + Arguments.of(true, 0L, true), // should use default inactivity timeout of 10 minutes + Arguments.of(false, 1L, false), + Arguments.of(false, 50L, false), + Arguments.of(false, 99L, false), + Arguments.of(false, 100L, false), + Arguments.of(false, 101L, true), + Arguments.of(false, 0L, true) // should use default inactivity timeout of 10 minutes + ); } @Test - public void increaseInactivityForActiveDeviceTest() throws Exception { - final long defaultTimeout = 1000; - initStateService(defaultTimeout); - DeviceState deviceState = DeviceState.builder().build(); - DeviceStateData deviceStateData = DeviceStateData.builder() - .tenantId(tenantId) - .deviceId(deviceId) - .state(deviceState) - .metaData(new TbMsgMetaData()) - .build(); - - service.deviceStates.put(deviceId, deviceStateData); - service.getPartitionedEntities(tpi).add(deviceId); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - - reset(telemetrySubscriptionService); - - long increase = 100; - long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase; - - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> - request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) - )); - Thread.sleep(defaultTimeout + increase); - service.checkStates(); - activityVerify(false); - - reset(telemetrySubscriptionService); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - Thread.sleep(newTimeout); - service.checkStates(); - activityVerify(false); - } - - @Test - public void increaseSmallInactivityForInactiveDeviceTest() throws Exception { - final long defaultTimeout = 1; - initStateService(defaultTimeout); - DeviceState deviceState = DeviceState.builder().build(); - DeviceStateData deviceStateData = DeviceStateData.builder() - .tenantId(tenantId) - .deviceId(deviceId) - .state(deviceState) - .metaData(new TbMsgMetaData()) - .build(); - - service.deviceStates.put(deviceId, deviceStateData); - service.getPartitionedEntities(tpi).add(deviceId); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - Thread.sleep(defaultTimeout); - service.checkStates(); - activityVerify(false); - - reset(telemetrySubscriptionService); - - long newTimeout = 1; - Thread.sleep(newTimeout); - verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> - request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) - )); - } - - @Test - public void decreaseInactivityForActiveDeviceTest() throws Exception { - final long defaultTimeout = 1000; - initStateService(defaultTimeout); - DeviceState deviceState = DeviceState.builder().build(); - DeviceStateData deviceStateData = DeviceStateData.builder() - .tenantId(tenantId) - .deviceId(deviceId) - .state(deviceState) - .metaData(new TbMsgMetaData()) - .build(); - - service.deviceStates.put(deviceId, deviceStateData); - service.getPartitionedEntities(tpi).add(deviceId); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - - long newTimeout = 1; - Thread.sleep(newTimeout); - - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - activityVerify(false); - reset(telemetrySubscriptionService); - - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, defaultTimeout); - activityVerify(true); - Thread.sleep(defaultTimeout); - service.checkStates(); - activityVerify(false); - } - - @Test - public void decreaseInactivityForInactiveDeviceTest() throws Exception { - final long defaultTimeout = 1000; - initStateService(defaultTimeout); - DeviceState deviceState = DeviceState.builder().build(); - DeviceStateData deviceStateData = DeviceStateData.builder() - .tenantId(tenantId) - .deviceId(deviceId) - .state(deviceState) - .metaData(new TbMsgMetaData()) - .build(); - - service.deviceStates.put(deviceId, deviceStateData); - service.getPartitionedEntities(tpi).add(deviceId); - - service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); - activityVerify(true); - Thread.sleep(defaultTimeout); - service.checkStates(); - activityVerify(false); - reset(telemetrySubscriptionService); - - long newTimeout = 1; - - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> - request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) - )); - } - - private void activityVerify(boolean isActive) { - verify(telemetrySubscriptionService).saveAttributes(argThat(request -> - request.getEntityId().equals(deviceId) && - request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && - request.getEntries().get(0).getValue().equals(isActive) - )); - } - - @Test - public void givenStateDataIsNull_whenUpdateActivityState_thenShouldCleanupDevice() { + void givenStateDataIsNull_whenUpdateActivityState_thenShouldCleanupDevice() { // GIVEN service.deviceStates.put(deviceId, deviceStateDataMock); @@ -719,7 +617,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideParametersForUpdateActivityState") - public void givenTestParameters_whenUpdateActivityState_thenShouldBeInTheExpectedStateAndPerformExpectedActions( + void givenTestParameters_whenUpdateActivityState_thenShouldBeInTheExpectedStateAndPerformExpectedActions( boolean activityState, long previousActivityTime, long lastReportedActivity, long inactivityAlarmTime, long expectedInactivityAlarmTime, boolean shouldSetInactivityAlarmTimeToZero, boolean shouldUpdateActivityStateToActive @@ -739,13 +637,15 @@ public class DefaultDeviceStateServiceTest { .metaData(new TbMsgMetaData()) .build(); + mockSuccessfulSaveAttributes(); + // WHEN service.updateActivityState(deviceId, deviceStateData, lastReportedActivity); // THEN assertThat(deviceState.isActive()).isEqualTo(true); assertThat(deviceState.getLastActivityTime()).isEqualTo(lastReportedActivity); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(LAST_ACTIVITY_TIME) && request.getEntries().get(0).getValue().equals(lastReportedActivity) @@ -753,7 +653,7 @@ public class DefaultDeviceStateServiceTest { assertThat(deviceState.getLastInactivityAlarmTime()).isEqualTo(expectedInactivityAlarmTime); if (shouldSetInactivityAlarmTimeToZero) { - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && request.getEntries().get(0).getValue().equals(0L) @@ -761,7 +661,7 @@ public class DefaultDeviceStateServiceTest { } if (shouldUpdateActivityStateToActive) { - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(true) @@ -809,59 +709,8 @@ public class DefaultDeviceStateServiceTest { ); } - @ParameterizedTest - @MethodSource("provideParametersForDecreaseInactivityTimeout") - public void givenTestParameters_whenOnDeviceInactivityTimeout_thenShouldBeInTheExpectedStateAndPerformExpectedActions( - boolean activityState, long newInactivityTimeout, long timeIncrement, boolean expectedActivityState - ) throws Exception { - // GIVEN - long defaultInactivityTimeout = 10000; - initStateService(defaultInactivityTimeout); - - var currentTime = new AtomicLong(System.currentTimeMillis()); - - DeviceState deviceState = DeviceState.builder() - .active(activityState) - .lastActivityTime(currentTime.get()) - .inactivityTimeout(defaultInactivityTimeout) - .build(); - - DeviceStateData deviceStateData = DeviceStateData.builder() - .tenantId(tenantId) - .deviceId(deviceId) - .state(deviceState) - .metaData(new TbMsgMetaData()) - .build(); - - service.deviceStates.put(deviceId, deviceStateData); - service.getPartitionedEntities(tpi).add(deviceId); - - given(service.getCurrentTimeMillis()).willReturn(currentTime.addAndGet(timeIncrement)); - - // WHEN - service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout); - - // THEN - assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout); - assertThat(deviceState.isActive()).isEqualTo(expectedActivityState); - if (activityState && !expectedActivityState) { - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> - request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && - request.getEntries().get(0).getValue().equals(false) - )); - } - } - - private static Stream provideParametersForDecreaseInactivityTimeout() { - return Stream.of( - Arguments.of(true, 1, 0, true), - - Arguments.of(true, 1, 1, false) - ); - } - @Test - public void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() { + void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() { // GIVEN service.deviceStates.put(deviceId, deviceStateDataMock); @@ -875,7 +724,7 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenNotMyPartition_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() { + void givenNotMyPartition_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() { // GIVEN long currentTime = System.currentTimeMillis(); @@ -911,7 +760,7 @@ public class DefaultDeviceStateServiceTest { @ParameterizedTest @MethodSource("provideParametersForUpdateInactivityStateIfExpired") - public void givenTestParameters_whenUpdateInactivityStateIfExpired_thenShouldBeInTheExpectedStateAndPerformExpectedActions( + void givenTestParameters_whenUpdateInactivityStateIfExpired_thenShouldBeInTheExpectedStateAndPerformExpectedActions( boolean activityState, long ts, long lastActivityTime, long lastInactivityAlarmTime, long inactivityTimeout, long deviceCreationTime, boolean expectedActivityState, long expectedLastInactivityAlarmTime, boolean shouldUpdateActivityStateToInactive ) { @@ -933,6 +782,7 @@ public class DefaultDeviceStateServiceTest { if (shouldUpdateActivityStateToInactive) { given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi); + mockSuccessfulSaveAttributes(); } // WHEN @@ -943,7 +793,7 @@ public class DefaultDeviceStateServiceTest { assertThat(state.getLastInactivityAlarmTime()).isEqualTo(expectedLastInactivityAlarmTime); if (shouldUpdateActivityStateToInactive) { - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(false) )); @@ -961,7 +811,7 @@ public class DefaultDeviceStateServiceTest { assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId); assertThat(actualNotification.isActive()).isFalse(); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && @@ -1033,7 +883,79 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() { + void givenInactiveDevice_whenActivityStatusChangesToActiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache2() { + // GIVEN + var deviceState = DeviceState.builder() + .active(false) + .lastActivityTime(100L) + .inactivityTimeout(50L) + .build(); + + var deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(TbMsgMetaData.EMPTY) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + // WHEN-THEN + + // simulating short DB outage + given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save"))); + doReturn(200L).when(service).getCurrentTimeMillis(); + service.onDeviceActivity(tenantId, deviceId, 180L); + assertThat(deviceState.isActive()).isFalse(); // still inactive + + // 10 millis pass... and new activity message it received + + // this time DB save is successful + when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1)))); + doReturn(210L).when(service).getCurrentTimeMillis(); + service.onDeviceActivity(tenantId, deviceId, 190L); + assertThat(deviceState.isActive()).isTrue(); + } + + @Test + void givenActiveDevice_whenActivityStatusChangesToInactiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache() { + // GIVEN + var deviceState = DeviceState.builder() + .active(true) + .lastActivityTime(100L) + .inactivityTimeout(50L) + .build(); + + var deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(TbMsgMetaData.EMPTY) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + // WHEN-THEN (assuming periodic activity states check is done every 100 millis) + + // simulating short DB outage + given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save"))); + doReturn(200L).when(service).getCurrentTimeMillis(); + service.checkStates(); + assertThat(deviceState.isActive()).isTrue(); // still active + + // waiting 100 millis... periodic activity states check is triggered again + + // this time DB save is successful + when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1)))); + doReturn(300L).when(service).getCurrentTimeMillis(); + service.checkStates(); + assertThat(deviceState.isActive()).isFalse(); + } + + @Test + void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() { doAnswer(invocation -> { Thread.sleep(100); return deviceStateDataMock; @@ -1069,10 +991,8 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException { + void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() { // GIVEN - final long defaultTimeout = 1000; - initStateService(defaultTimeout); given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId)); given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyCollection())).willReturn(Futures.immediateFuture(Collections.emptyList())); @@ -1086,13 +1006,15 @@ public class DefaultDeviceStateServiceTest { .setDeleted(false) .build(); + mockSuccessfulSaveAttributes(); + // WHEN service.onQueueMsg(proto, TbCallback.EMPTY); // THEN await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(false) )); @@ -1100,14 +1022,12 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException { + void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() { // GIVEN - final long defaultTimeout = 1000; - initStateService(defaultTimeout); long currentTime = System.currentTimeMillis(); DeviceState deviceState = DeviceState.builder() .active(false) - .inactivityTimeout(service.getDefaultInactivityTimeoutInSec()) + .inactivityTimeout(defaultInactivityTimeoutMs) .build(); DeviceStateData stateData = DeviceStateData.builder() .tenantId(tenantId) @@ -1118,12 +1038,14 @@ public class DefaultDeviceStateServiceTest { .build(); service.deviceStates.put(deviceId, stateData); + mockSuccessfulSaveAttributes(); + // WHEN service.onDeviceActivity(tenantId, deviceId, currentTime); // THEN ArgumentCaptor attributeRequestCaptor = ArgumentCaptor.forClass(AttributesSaveRequest.class); - then(telemetrySubscriptionService).should(times(2)).saveAttributes(attributeRequestCaptor.capture()); + then(telemetrySubscriptionService).should(times(2)).saveAttributesInternal(attributeRequestCaptor.capture()); await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); @@ -1151,15 +1073,14 @@ public class DefaultDeviceStateServiceTest { } @Test - public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException { + void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() { // GIVEN - final long defaultTimeout = 1000; - initStateService(defaultTimeout); given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId)); given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyCollection())).willReturn(Futures.immediateFuture(Collections.emptyList())); long currentTime = System.currentTimeMillis(); - DeviceState deviceState = DeviceState.builder() + + var deviceState = DeviceState.builder() .active(true) .lastConnectTime(currentTime - 8000) .lastActivityTime(currentTime - 4000) @@ -1167,16 +1088,20 @@ public class DefaultDeviceStateServiceTest { .lastInactivityAlarmTime(0) .inactivityTimeout(3000) .build(); - DeviceStateData stateData = DeviceStateData.builder() + + var stateData = DeviceStateData.builder() .tenantId(tenantId) .deviceId(deviceId) .deviceCreationTime(currentTime - 10000) .state(deviceState) .build(); + service.deviceStates.put(deviceId, stateData); + mockSuccessfulSaveAttributes(); + // WHEN - TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder() + var proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) @@ -1190,11 +1115,25 @@ public class DefaultDeviceStateServiceTest { // THEN await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); - then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> + then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(true) )); }); } + private void mockSuccessfulSaveAttributes() { + lenient().when(telemetrySubscriptionService.saveAttributesInternal(any())).thenAnswer(invocation -> { + AttributesSaveRequest request = invocation.getArgument(0); + return Futures.immediateFuture(generateRandomVersions(request.getEntries().size())); + }); + } + + private static List generateRandomVersions(int n) { + return ThreadLocalRandom.current() + .longs(n) + .boxed() + .toList(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 2b4d9f38e5..153228a865 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -472,7 +473,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(saveAttributes, sendWsUpdate, processCalculatedFields)) .build(); - lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())).thenReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())) + .thenReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -547,7 +549,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -581,7 +584,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -613,7 +617,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -640,7 +645,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -715,7 +721,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -764,7 +771,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -792,7 +800,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -815,7 +824,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -843,7 +853,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -870,7 +881,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 718c574c3c..0d5d3dcd13 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import java.util.Collection; import java.util.List; @@ -37,9 +38,9 @@ public interface AttributesService { ListenableFuture> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope); - ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes); + ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes); - ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); + ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java new file mode 100644 index 0000000000..711a3e06a4 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import java.util.Collections; +import java.util.List; + +public record AttributesSaveResult(List versions) { + + public static final AttributesSaveResult EMPTY = new AttributesSaveResult(Collections.emptyList()); + + public static AttributesSaveResult of(List versions) { + if (versions == null) { + return EMPTY; + } + return new AttributesSaveResult(versions); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 777a77d054..9803670d4b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.Validator; @@ -41,7 +42,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @@ -101,26 +101,29 @@ public class BaseAttributesService implements AttributesService { } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - return doSave(tenantId, entityId, scope, attribute); + return doSave(tenantId, entityId, scope, List.of(attribute)); } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); AttributeUtils.validate(attributes, valueNoXssValidation); - List> saveFutures = attributes.stream().map(attribute -> doSave(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); - return Futures.allAsList(saveFutures); + return doSave(tenantId, entityId, scope, attributes); } - private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> { - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); - return version; - }, MoreExecutors.directExecutor()); + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + List> futures = new ArrayList<>(attributes.size()); + for (AttributeKvEntry attribute : attributes) { + ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { + edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); + return version; + }, MoreExecutors.directExecutor()); + futures.add(future); + } + return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 559828911f..d99413f13a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.edqs.EdqsService; @@ -56,7 +57,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @@ -150,7 +150,7 @@ public class CachedAttributesService implements AttributesService { List cachedAttributes = wrappedCachedAttributes.values().stream() .map(TbCacheValueWrapper::get) .filter(Objects::nonNull) - .collect(Collectors.toList()); + .toList(); if (wrappedCachedAttributes.size() == attributeKeys.size()) { log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys); return Futures.immediateFuture(cachedAttributes); @@ -159,8 +159,6 @@ public class CachedAttributesService implements AttributesService { Set notFoundAttributeKeys = new HashSet<>(attributeKeys); notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); - List notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList()); - // DB call should run in DB executor, not in cache-related executor return jpaExecutorService.submit(() -> { log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); @@ -222,33 +220,31 @@ public class CachedAttributesService implements AttributesService { } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - return doSave(tenantId, entityId, scope, attribute); + return doSave(tenantId, entityId, scope, List.of(attribute)); } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); AttributeUtils.validate(attributes, valueNoXssValidation); - - List> futures = new ArrayList<>(attributes.size()); - for (var attribute : attributes) { - futures.add(doSave(tenantId, entityId, scope, attribute)); - } - - return Futures.allAsList(futures); + return doSave(tenantId, entityId, scope, attributes); } - private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> { - BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); - put(entityId, scope, attributeKvEntry); - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); - return version; - }, cacheExecutor); + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + List> futures = new ArrayList<>(attributes.size()); + for (var attribute : attributes) { + ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { + BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); + put(entityId, scope, attributeKvEntry); + edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); + return version; + }, cacheExecutor); + futures.add(future); + } + return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor()); } private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { @@ -270,7 +266,7 @@ public class CachedAttributesService implements AttributesService { edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); } return key; - }, cacheExecutor)).collect(Collectors.toList())); + }, cacheExecutor)).toList()); } @Override