From 9a82f6b8817cd7017748056a606a73160bc2e42f Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 9 Jun 2025 15:47:52 +0300 Subject: [PATCH] Return `AttributesSaveResult` instead of just `List` when saving attributes --- .../cf/CalculatedFieldQueueService.java | 3 +- .../DefaultCalculatedFieldQueueService.java | 12 +++--- .../device/DeviceProvisionServiceImpl.java | 12 +++--- .../service/edge/rpc/EdgeGrpcSession.java | 16 +++---- .../DefaultSystemDataLoaderService.java | 9 ++-- .../DefaultTelemetrySubscriptionService.java | 8 ++-- .../telemetry/InternalTelemetryService.java | 5 +-- .../service/entitiy/EntityServiceTest.java | 25 +++++------ .../state/DefaultDeviceStateServiceTest.java | 5 ++- ...faultTelemetrySubscriptionServiceTest.java | 34 ++++++++++----- .../dao/attributes/AttributesService.java | 5 ++- .../common/data/kv/AttributesSaveResult.java | 32 ++++++++++++++ .../dao/attributes/BaseAttributesService.java | 27 ++++++------ .../attributes/CachedAttributesService.java | 42 +++++++++---------- 14 files changed, 142 insertions(+), 93 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index eb86220361..f9ec8087ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; @@ -35,7 +36,7 @@ public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQu */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback callback); void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8289e4db42..c3185738f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -96,7 +97,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } @Override - public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { + public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), @@ -186,17 +187,18 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); + List entries = request.getEntries(); + List versions = result.versions(); + for (int i = 0; i < entries.size(); i++) { AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); - if (versions != null) { - attrProtoBuilder.setVersion(versions.get(i)); - } + attrProtoBuilder.setVersion(versions.get(i)); telemetryMsg.addAttrData(attrProtoBuilder.build()); } msg.setTelemetryMsg(telemetryMsg.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java index 173ba742af..0778d61ee7 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -62,8 +63,6 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; @@ -240,10 +239,11 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); } - private ListenableFuture> saveProvisionStateAttribute(Device device) { - return attributesService.save(device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE, - Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), - System.currentTimeMillis()))); + private ListenableFuture saveProvisionStateAttribute(Device device) { + return attributesService.save( + device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE, + new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), System.currentTimeMillis()) + ); } private DeviceCredentials getDeviceCredentials(Device device) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 4a9b68fc6d..8922fa6008 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -42,7 +43,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; @@ -582,10 +582,10 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onSuccess(@Nullable Pair newStartTsAndSeqId) { if (newStartTsAndSeqId != null) { - ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); + ListenableFuture updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); Futures.addCallback(updateFuture, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List list) { + public void onSuccess(@Nullable AttributesSaveResult saveResult) { log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); boolean newEventsAvailable; if (fetcher.isSeqIdNewCycleStarted()) { @@ -646,8 +646,7 @@ public abstract class EdgeGrpcSession implements Closeable { log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg); } } - case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> - downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); + case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); } } catch (Exception e) { @@ -723,13 +722,14 @@ public abstract class EdgeGrpcSession implements Closeable { return startSeqId; } - private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) { + private ListenableFuture updateQueueStartTsAndSeqId(Pair pair) { newStartTs = pair.getFirst(); newStartSeqId = pair.getSecond(); log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", sessionId, edge.getId(), newStartTs, newStartSeqId); - List attributes = Arrays.asList( + List attributes = List.of( new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()), - new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis())); + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis()) + ); return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index e9ef8c5ace..d580175aa0 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; @@ -581,9 +582,9 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); } else { - ListenableFuture> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, - Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) - , System.currentTimeMillis()))); + ListenableFuture saveFuture = attributesService.save( + TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(new BooleanDataEntry(key, value), System.currentTimeMillis()) + ); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); } } @@ -611,7 +612,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { } private void addTsCallback(ListenableFuture saveFuture, final FutureCallback callback) { - Futures.addCallback(saveFuture, new FutureCallback() { + Futures.addCallback(saveFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable S result) { callback.onSuccess(result); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 4640b9339f..0ff2d42f15 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -62,7 +63,6 @@ import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -190,16 +190,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public ListenableFuture> saveAttributesInternal(AttributesSaveRequest request) { + public ListenableFuture saveAttributesInternal(AttributesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); AttributesSaveRequest.Strategy strategy = request.getStrategy(); - ListenableFuture> resultFuture; + ListenableFuture resultFuture; if (strategy.saveAttributes()) { resultFuture = attrService.save(tenantId, entityId, request.getScope(), request.getEntries()); } else { - resultFuture = Futures.immediateFuture(Collections.emptyList()); + resultFuture = Futures.immediateFuture(AttributesSaveResult.EMPTY); } addMainCallback(resultFuture, result -> { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 79f0beab41..2d3be5a0ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -21,10 +21,9 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; -import java.util.List; - /** * Created by ashvayka on 27.03.18. */ @@ -32,7 +31,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request); - ListenableFuture> saveAttributesInternal(AttributesSaveRequest request); + ListenableFuture saveAttributesInternal(AttributesSaveRequest request); void deleteTimeseriesInternal(TimeseriesDeleteRequest request); diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java index 264ac70443..d8bcd42bc8 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.IdBased; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -395,7 +396,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -545,7 +546,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -599,7 +600,7 @@ public class EntityServiceTest extends AbstractControllerTest { List highConsumptions = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>()); - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < assets.size(); i++) { Asset asset = assets.get(i); attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE)); @@ -1506,7 +1507,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); for (AttributeScope currentScope : AttributeScope.values()) { @@ -1578,7 +1579,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); @@ -1808,7 +1809,7 @@ public class EntityServiceTest extends AbstractControllerTest { } } - List>> attributeFutures = new ArrayList<>(); + List> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), AttributeScope.CLIENT_SCOPE)); @@ -2269,16 +2270,16 @@ public class EntityServiceTest extends AbstractControllerTest { return filter; } - private ListenableFuture> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) { + private ListenableFuture saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) { KvEntry attrValue = new LongDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); - return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); + return attributesService.save(tenantId, entityId, scope, List.of(attr)); } - private ListenableFuture> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) { + private ListenableFuture saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) { KvEntry attrValue = new StringDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); - return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); + return attributesService.save(tenantId, entityId, scope, List.of(attr)); } private ListenableFuture saveTimeseries(EntityId entityId, String key, Double value) { @@ -2294,8 +2295,8 @@ public class EntityServiceTest extends AbstractControllerTest { } protected void createMultiRootHierarchy(List buildings, List apartments, - Map> entityNameByTypeMap, - Map childParentRelationMap) throws InterruptedException { + Map> entityNameByTypeMap, + Map childParentRelationMap) throws InterruptedException { for (int k = 0; k < 3; k++) { Asset building = new Asset(); building.setTenantId(tenantId); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 0fe29eef57..5bf87137ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; import org.thingsboard.server.common.msg.TbMsg; @@ -911,7 +912,7 @@ class DefaultDeviceStateServiceTest { // 10 millis pass... and new activity message it received // this time DB save is successful - when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1))); + when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1)))); doReturn(210L).when(service).getCurrentTimeMillis(); service.onDeviceActivity(tenantId, deviceId, 190L); assertThat(deviceState.isActive()).isTrue(); @@ -947,7 +948,7 @@ class DefaultDeviceStateServiceTest { // waiting 100 millis... periodic activity states check is triggered again // this time DB save is successful - when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1))); + when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1)))); doReturn(300L).when(service).getCurrentTimeMillis(); service.checkStates(); assertThat(deviceState.isActive()).isFalse(); diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 2b4d9f38e5..153228a865 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -472,7 +473,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(saveAttributes, sendWsUpdate, processCalculatedFields)) .build(); - lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())).thenReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())) + .thenReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -547,7 +549,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -581,7 +584,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -613,7 +617,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -640,7 +645,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), entries)) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size())))); // WHEN telemetryService.saveAttributes(request); @@ -715,7 +721,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -764,7 +771,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -792,7 +800,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -815,7 +824,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -843,7 +853,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); @@ -870,7 +881,8 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new AttributesSaveRequest.Strategy(true, false, false)) .build(); - given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); + given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())) + .willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size())))); // WHEN telemetryService.saveAttributes(request); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 718c574c3c..0d5d3dcd13 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import java.util.Collection; import java.util.List; @@ -37,9 +38,9 @@ public interface AttributesService { ListenableFuture> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope); - ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes); + ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes); - ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); + ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java new file mode 100644 index 0000000000..711a3e06a4 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributesSaveResult.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import java.util.Collections; +import java.util.List; + +public record AttributesSaveResult(List versions) { + + public static final AttributesSaveResult EMPTY = new AttributesSaveResult(Collections.emptyList()); + + public static AttributesSaveResult of(List versions) { + if (versions == null) { + return EMPTY; + } + return new AttributesSaveResult(versions); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 777a77d054..9803670d4b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.Validator; @@ -41,7 +42,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @@ -101,26 +101,29 @@ public class BaseAttributesService implements AttributesService { } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - return doSave(tenantId, entityId, scope, attribute); + return doSave(tenantId, entityId, scope, List.of(attribute)); } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); AttributeUtils.validate(attributes, valueNoXssValidation); - List> saveFutures = attributes.stream().map(attribute -> doSave(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); - return Futures.allAsList(saveFutures); + return doSave(tenantId, entityId, scope, attributes); } - private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> { - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); - return version; - }, MoreExecutors.directExecutor()); + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + List> futures = new ArrayList<>(attributes.size()); + for (AttributeKvEntry attribute : attributes) { + ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { + edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); + return version; + }, MoreExecutors.directExecutor()); + futures.add(future); + } + return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 559828911f..d99413f13a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.edqs.EdqsService; @@ -56,7 +57,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @@ -150,7 +150,7 @@ public class CachedAttributesService implements AttributesService { List cachedAttributes = wrappedCachedAttributes.values().stream() .map(TbCacheValueWrapper::get) .filter(Objects::nonNull) - .collect(Collectors.toList()); + .toList(); if (wrappedCachedAttributes.size() == attributeKeys.size()) { log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys); return Futures.immediateFuture(cachedAttributes); @@ -159,8 +159,6 @@ public class CachedAttributesService implements AttributesService { Set notFoundAttributeKeys = new HashSet<>(attributeKeys); notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); - List notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList()); - // DB call should run in DB executor, not in cache-related executor return jpaExecutorService.submit(() -> { log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); @@ -222,33 +220,31 @@ public class CachedAttributesService implements AttributesService { } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - return doSave(tenantId, entityId, scope, attribute); + return doSave(tenantId, entityId, scope, List.of(attribute)); } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); AttributeUtils.validate(attributes, valueNoXssValidation); - - List> futures = new ArrayList<>(attributes.size()); - for (var attribute : attributes) { - futures.add(doSave(tenantId, entityId, scope, attribute)); - } - - return Futures.allAsList(futures); + return doSave(tenantId, entityId, scope, attributes); } - private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> { - BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); - put(entityId, scope, attributeKvEntry); - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); - return version; - }, cacheExecutor); + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { + List> futures = new ArrayList<>(attributes.size()); + for (var attribute : attributes) { + ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { + BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); + put(entityId, scope, attributeKvEntry); + edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); + return version; + }, cacheExecutor); + futures.add(future); + } + return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor()); } private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { @@ -270,7 +266,7 @@ public class CachedAttributesService implements AttributesService { edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); } return key; - }, cacheExecutor)).collect(Collectors.toList())); + }, cacheExecutor)).toList()); } @Override