Return AttributesSaveResult instead of just List<Long> when saving attributes

This commit is contained in:
Dmytro Skarzhynets 2025-06-09 15:47:52 +03:00
parent baaa9f7235
commit 9a82f6b881
No known key found for this signature in database
GPG Key ID: 2B51652F224037DF
14 changed files with 142 additions and 93 deletions

View File

@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import java.util.List; import java.util.List;
@ -35,7 +36,7 @@ public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQu
*/ */
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback); void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback);

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
@ -96,7 +97,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
} }
@Override @Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) { public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId(); var tenantId = request.getTenantId();
var entityId = request.getEntityId(); var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
@ -186,17 +187,18 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
return msg.build(); return msg.build();
} }
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) { private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries(); List<AttributeKvEntry> entries = request.getEntries();
List<Long> versions = result.versions();
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder();
if (versions != null) { attrProtoBuilder.setVersion(versions.get(i));
attrProtoBuilder.setVersion(versions.get(i));
}
telemetryMsg.addAttrData(attrProtoBuilder.build()); telemetryMsg.addAttrData(attrProtoBuilder.build());
} }
msg.setTelemetryMsg(telemetryMsg.build()); msg.setTelemetryMsg(telemetryMsg.build());

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
@ -62,8 +63,6 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -240,10 +239,11 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
} }
private ListenableFuture<List<Long>> saveProvisionStateAttribute(Device device) { private ListenableFuture<AttributesSaveResult> saveProvisionStateAttribute(Device device) {
return attributesService.save(device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE, return attributesService.save(
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE,
System.currentTimeMillis()))); new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), System.currentTimeMillis())
);
} }
private DeviceCredentials getDeviceCredentials(Device device) { private DeviceCredentials getDeviceCredentials(Device device) {

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
@ -42,7 +43,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
@ -582,10 +582,10 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override @Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) { public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
if (newStartTsAndSeqId != null) { if (newStartTsAndSeqId != null) {
ListenableFuture<List<Long>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); ListenableFuture<AttributesSaveResult> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() { Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable List<Long> list) { public void onSuccess(@Nullable AttributesSaveResult saveResult) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
boolean newEventsAvailable; boolean newEventsAvailable;
if (fetcher.isSeqIdNewCycleStarted()) { if (fetcher.isSeqIdNewCycleStarted()) {
@ -646,8 +646,7 @@ public abstract class EdgeGrpcSession implements Closeable {
log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg); log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg);
} }
} }
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction());
} }
} catch (Exception e) { } catch (Exception e) {
@ -723,13 +722,14 @@ public abstract class EdgeGrpcSession implements Closeable {
return startSeqId; return startSeqId;
} }
private ListenableFuture<List<Long>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) { private ListenableFuture<AttributesSaveResult> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
newStartTs = pair.getFirst(); newStartTs = pair.getFirst();
newStartSeqId = pair.getSecond(); newStartSeqId = pair.getSecond();
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", sessionId, edge.getId(), newStartTs, newStartSeqId); log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", sessionId, edge.getId(), newStartTs, newStartSeqId);
List<AttributeKvEntry> attributes = Arrays.asList( List<AttributeKvEntry> attributes = List.of(
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()), new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()),
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis())); new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis())
);
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes); return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes);
} }

View File

@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry;
@ -581,9 +582,9 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L);
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
} else { } else {
ListenableFuture<List<Long>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, ListenableFuture<AttributesSaveResult> saveFuture = attributesService.save(
Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(new BooleanDataEntry(key, value), System.currentTimeMillis())
, System.currentTimeMillis()))); );
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
} }
} }
@ -611,7 +612,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
} }
private <S> void addTsCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) { private <S> void addTsCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() { Futures.addCallback(saveFuture, new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable S result) { public void onSuccess(@Nullable S result) {
callback.onSuccess(result); callback.onSuccess(result);

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -62,7 +63,6 @@ import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -190,16 +190,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
} }
@Override @Override
public ListenableFuture<List<Long>> saveAttributesInternal(AttributesSaveRequest request) { public ListenableFuture<AttributesSaveResult> saveAttributesInternal(AttributesSaveRequest request) {
TenantId tenantId = request.getTenantId(); TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId(); EntityId entityId = request.getEntityId();
AttributesSaveRequest.Strategy strategy = request.getStrategy(); AttributesSaveRequest.Strategy strategy = request.getStrategy();
ListenableFuture<List<Long>> resultFuture; ListenableFuture<AttributesSaveResult> resultFuture;
if (strategy.saveAttributes()) { if (strategy.saveAttributes()) {
resultFuture = attrService.save(tenantId, entityId, request.getScope(), request.getEntries()); resultFuture = attrService.save(tenantId, entityId, request.getScope(), request.getEntries());
} else { } else {
resultFuture = Futures.immediateFuture(Collections.emptyList()); resultFuture = Futures.immediateFuture(AttributesSaveResult.EMPTY);
} }
addMainCallback(resultFuture, result -> { addMainCallback(resultFuture, result -> {

View File

@ -21,10 +21,9 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import java.util.List;
/** /**
* Created by ashvayka on 27.03.18. * Created by ashvayka on 27.03.18.
*/ */
@ -32,7 +31,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request); ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request);
ListenableFuture<List<Long>> saveAttributesInternal(AttributesSaveRequest request); ListenableFuture<AttributesSaveResult> saveAttributesInternal(AttributesSaveRequest request);
void deleteTimeseriesInternal(TimeseriesDeleteRequest request); void deleteTimeseriesInternal(TimeseriesDeleteRequest request);

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.IdBased; import org.thingsboard.server.common.data.id.IdBased;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -395,7 +396,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highTemperatures = new ArrayList<>(); List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) { for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i); Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -545,7 +546,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highTemperatures = new ArrayList<>(); List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) { for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i); Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -599,7 +600,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highConsumptions = new ArrayList<>(); List<Long> highConsumptions = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>()); createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>());
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < assets.size(); i++) { for (int i = 0; i < assets.size(); i++) {
Asset asset = assets.get(i); Asset asset = assets.get(i);
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE)); attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE));
@ -1506,7 +1507,7 @@ public class EntityServiceTest extends AbstractControllerTest {
} }
} }
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) { for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i); Device device = devices.get(i);
for (AttributeScope currentScope : AttributeScope.values()) { for (AttributeScope currentScope : AttributeScope.values()) {
@ -1578,7 +1579,7 @@ public class EntityServiceTest extends AbstractControllerTest {
} }
} }
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) { for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i); Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE)); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -1808,7 +1809,7 @@ public class EntityServiceTest extends AbstractControllerTest {
} }
} }
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>(); List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) { for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i); Device device = devices.get(i);
attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), AttributeScope.CLIENT_SCOPE)); attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), AttributeScope.CLIENT_SCOPE));
@ -2269,16 +2270,16 @@ public class EntityServiceTest extends AbstractControllerTest {
return filter; return filter;
} }
private ListenableFuture<List<Long>> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) { private ListenableFuture<AttributesSaveResult> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) {
KvEntry attrValue = new LongDataEntry(key, value); KvEntry attrValue = new LongDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); return attributesService.save(tenantId, entityId, scope, List.of(attr));
} }
private ListenableFuture<List<Long>> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) { private ListenableFuture<AttributesSaveResult> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) {
KvEntry attrValue = new StringDataEntry(key, value); KvEntry attrValue = new StringDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr)); return attributesService.save(tenantId, entityId, scope, List.of(attr));
} }
private ListenableFuture<TimeseriesSaveResult> saveTimeseries(EntityId entityId, String key, Double value) { private ListenableFuture<TimeseriesSaveResult> saveTimeseries(EntityId entityId, String key, Double value) {
@ -2294,8 +2295,8 @@ public class EntityServiceTest extends AbstractControllerTest {
} }
protected void createMultiRootHierarchy(List<Asset> buildings, List<Asset> apartments, protected void createMultiRootHierarchy(List<Asset> buildings, List<Asset> apartments,
Map<String, Map<UUID, String>> entityNameByTypeMap, Map<String, Map<UUID, String>> entityNameByTypeMap,
Map<UUID, UUID> childParentRelationMap) throws InterruptedException { Map<UUID, UUID> childParentRelationMap) throws InterruptedException {
for (int k = 0; k < 3; k++) { for (int k = 0; k < 3; k++) {
Asset building = new Asset(); Asset building = new Asset();
building.setTenantId(tenantId); building.setTenantId(tenantId);

View File

@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -911,7 +912,7 @@ class DefaultDeviceStateServiceTest {
// 10 millis pass... and new activity message it received // 10 millis pass... and new activity message it received
// this time DB save is successful // this time DB save is successful
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1))); when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1))));
doReturn(210L).when(service).getCurrentTimeMillis(); doReturn(210L).when(service).getCurrentTimeMillis();
service.onDeviceActivity(tenantId, deviceId, 190L); service.onDeviceActivity(tenantId, deviceId, 190L);
assertThat(deviceState.isActive()).isTrue(); assertThat(deviceState.isActive()).isTrue();
@ -947,7 +948,7 @@ class DefaultDeviceStateServiceTest {
// waiting 100 millis... periodic activity states check is triggered again // waiting 100 millis... periodic activity states check is triggered again
// this time DB save is successful // this time DB save is successful
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1))); when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1))));
doReturn(300L).when(service).getCurrentTimeMillis(); doReturn(300L).when(service).getCurrentTimeMillis();
service.checkStates(); service.checkStates();
assertThat(deviceState.isActive()).isFalse(); assertThat(deviceState.isActive()).isFalse();

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -472,7 +473,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(saveAttributes, sendWsUpdate, processCalculatedFields)) .strategy(new AttributesSaveRequest.Strategy(saveAttributes, sendWsUpdate, processCalculatedFields))
.build(); .build();
lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())).thenReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries()))
.thenReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -547,7 +549,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -581,7 +584,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -613,7 +617,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -640,7 +645,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -715,7 +721,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -764,7 +771,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -792,7 +800,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -815,7 +824,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -843,7 +853,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);
@ -870,7 +881,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false)) .strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build(); .build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size()))); given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN // WHEN
telemetryService.saveAttributes(request); telemetryService.saveAttributes(request);

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -37,9 +38,9 @@ public interface AttributesService {
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope); ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope);
ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes); ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes);
ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute);
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys); ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys);

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.kv;
import java.util.Collections;
import java.util.List;
public record AttributesSaveResult(List<Long> versions) {
public static final AttributesSaveResult EMPTY = new AttributesSaveResult(Collections.emptyList());
public static AttributesSaveResult of(List<Long> versions) {
if (versions == null) {
return EMPTY;
}
return new AttributesSaveResult(versions);
}
}

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.service.Validator;
@ -41,7 +42,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@ -101,26 +101,29 @@ public class BaseAttributesService implements AttributesService {
} }
@Override @Override
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation); AttributeUtils.validate(attribute, valueNoXssValidation);
return doSave(tenantId, entityId, scope, attribute); return doSave(tenantId, entityId, scope, List.of(attribute));
} }
@Override @Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) { public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attributes, valueNoXssValidation); AttributeUtils.validate(attributes, valueNoXssValidation);
List<ListenableFuture<Long>> saveFutures = attributes.stream().map(attribute -> doSave(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); return doSave(tenantId, entityId, scope, attributes);
return Futures.allAsList(saveFutures);
} }
private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { private ListenableFuture<AttributesSaveResult> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
return Futures.transform(future, version -> { for (AttributeKvEntry attribute : attributes) {
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
return version; edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
}, MoreExecutors.directExecutor()); return version;
}, MoreExecutors.directExecutor());
futures.add(future);
}
return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor());
} }
@Override @Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.common.msg.edqs.EdqsService;
@ -56,7 +57,6 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@ -150,7 +150,7 @@ public class CachedAttributesService implements AttributesService {
List<AttributeKvEntry> cachedAttributes = wrappedCachedAttributes.values().stream() List<AttributeKvEntry> cachedAttributes = wrappedCachedAttributes.values().stream()
.map(TbCacheValueWrapper::get) .map(TbCacheValueWrapper::get)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .toList();
if (wrappedCachedAttributes.size() == attributeKeys.size()) { if (wrappedCachedAttributes.size() == attributeKeys.size()) {
log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys); log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys);
return Futures.immediateFuture(cachedAttributes); return Futures.immediateFuture(cachedAttributes);
@ -159,8 +159,6 @@ public class CachedAttributesService implements AttributesService {
Set<String> notFoundAttributeKeys = new HashSet<>(attributeKeys); Set<String> notFoundAttributeKeys = new HashSet<>(attributeKeys);
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
List<AttributeCacheKey> notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList());
// DB call should run in DB executor, not in cache-related executor // DB call should run in DB executor, not in cache-related executor
return jpaExecutorService.submit(() -> { return jpaExecutorService.submit(() -> {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
@ -222,33 +220,31 @@ public class CachedAttributesService implements AttributesService {
} }
@Override @Override
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation); AttributeUtils.validate(attribute, valueNoXssValidation);
return doSave(tenantId, entityId, scope, attribute); return doSave(tenantId, entityId, scope, List.of(attribute));
} }
@Override @Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) { public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attributes, valueNoXssValidation); AttributeUtils.validate(attributes, valueNoXssValidation);
return doSave(tenantId, entityId, scope, attributes);
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
futures.add(doSave(tenantId, entityId, scope, attribute));
}
return Futures.allAsList(futures);
} }
private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { private ListenableFuture<AttributesSaveResult> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
return Futures.transform(future, version -> { for (var attribute : attributes) {
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
put(entityId, scope, attributeKvEntry); BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); put(entityId, scope, attributeKvEntry);
return version; edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
}, cacheExecutor); return version;
}, cacheExecutor);
futures.add(future);
}
return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor());
} }
private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
@ -270,7 +266,7 @@ public class CachedAttributesService implements AttributesService {
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
} }
return key; return key;
}, cacheExecutor)).collect(Collectors.toList())); }, cacheExecutor)).toList());
} }
@Override @Override