From a9f39e4917d0da31bdc89a45a8c5df344eec9d1f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 24 Dec 2024 12:50:27 +0200 Subject: [PATCH] added cache class --- .../service/cf/CalculatedFieldCache.java | 43 +++ .../cf/DefaultCalculatedFieldCache.java | 212 +++++++++++++ ...efaultCalculatedFieldExecutionService.java | 285 ++++++++++-------- .../thingsboard/server/common/msg/TbMsg.java | 15 +- common/proto/src/main/proto/queue.proto | 29 +- .../engine/telemetry/TbMsgAttributesNode.java | 1 + 6 files changed, 457 insertions(+), 128 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java new file mode 100644 index 0000000000..f953e57cc3 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +import java.util.List; +import java.util.Set; + +public interface CalculatedFieldCache { + + CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); + + List getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId); + + List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); + + CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService); + + Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); + + void evict(CalculatedFieldId calculatedFieldId); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java new file mode 100644 index 0000000000..1fc2a90e07 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -0,0 +1,212 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import jakarta.annotation.PostConstruct; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.cf.CalculatedFieldService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Service +@Slf4j +@RequiredArgsConstructor +public class DefaultCalculatedFieldCache implements CalculatedFieldCache { + + private final Lock calculatedFieldFetchLock = new ReentrantLock(); + + private final CalculatedFieldService calculatedFieldService; + private final AssetService assetService; + private final DeviceService deviceService; + + private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); + private final ConcurrentMap> calculatedFieldLinks = new ConcurrentHashMap<>(); + private final ConcurrentMap> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>(); + private final ConcurrentMap calculatedFieldsCtx = new ConcurrentHashMap<>(); + private final ConcurrentMap> profileEntities = new ConcurrentHashMap<>(); + + @Value("${calculatedField.initFetchPackSize:50000}") + @Getter + private int initFetchPackSize; + + + @PostConstruct + public void init() { + // to discuss: fetch on start or fetch on demand + PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); + cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); + PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); + cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); + } + + @Override + public CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + CalculatedField calculatedField = calculatedFields.get(calculatedFieldId); + if (calculatedField == null) { + calculatedFieldFetchLock.lock(); + try { + calculatedField = calculatedFields.get(calculatedFieldId); + if (calculatedField == null) { + calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); + if (calculatedField != null) { + calculatedFields.put(calculatedFieldId, calculatedField); + log.debug("[{}] Fetch calculated field into cache: {}", calculatedFieldId, calculatedField); + } + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + log.trace("[{}] Found calculated field in cache: {}", calculatedFieldId, calculatedField); + return calculatedField; + } + + @Override + public List getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + List cfLinks = calculatedFieldLinks.get(calculatedFieldId); + if (cfLinks == null) { + calculatedFieldFetchLock.lock(); + try { + cfLinks = calculatedFieldLinks.get(calculatedFieldId); + if (cfLinks == null) { + cfLinks = calculatedFieldService.findAllCalculatedFieldLinksById(tenantId, calculatedFieldId); + if (cfLinks != null) { + calculatedFieldLinks.put(calculatedFieldId, cfLinks); + log.debug("[{}] Fetch calculated field links into cache: {}", calculatedFieldId, cfLinks); + } + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + log.trace("[{}] Found calculated field links in cache: {}", calculatedFieldId, cfLinks); + return cfLinks; + } + + @Override + public List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) { + List cfLinks = entityIdCalculatedFieldLinks.get(entityId); + if (cfLinks == null) { + calculatedFieldFetchLock.lock(); + try { + cfLinks = entityIdCalculatedFieldLinks.get(entityId); + if (cfLinks == null) { + cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId); + if (cfLinks != null) { + entityIdCalculatedFieldLinks.put(entityId, cfLinks); + log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks); + } + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + log.trace("[{}] Found calculated field links by entity id in cache: {}", entityId, cfLinks); + return cfLinks; + } + + @Override + public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) { + CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); + if (ctx == null) { + calculatedFieldFetchLock.lock(); + try { + ctx = calculatedFieldsCtx.get(calculatedFieldId); + if (ctx == null) { + CalculatedField calculatedField = getCalculatedField(tenantId, calculatedFieldId); + if (calculatedField != null) { + ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService); + calculatedFieldsCtx.put(calculatedFieldId, ctx); + log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx); + } + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx); + return ctx; + } + + @Override + public Set getEntitiesByProfile(TenantId tenantId, EntityId entityProfileId) { + Set entities = profileEntities.get(entityProfileId); + if (entities == null) { + calculatedFieldFetchLock.lock(); + try { + entities = profileEntities.get(entityProfileId); + if (entities == null) { + entities = switch (entityProfileId.getEntityType()) { + case ASSET_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { + Set assetIds = new HashSet<>(); + (new PageDataIterable<>(pageLink -> + assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) profileId, pageLink), initFetchPackSize)).forEach(assetIds::add); + return assetIds; + }); + case DEVICE_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { + Set deviceIds = new HashSet<>(); + (new PageDataIterable<>(pageLink -> + deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityProfileId, pageLink), initFetchPackSize)).forEach(deviceIds::add); + return deviceIds; + }); + default -> + throw new IllegalArgumentException("Entity type should be ASSET_PROFILE or DEVICE_PROFILE."); + }; + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + log.trace("[{}] Found entities by profile in cache: {}", entityProfileId, entities); + return entities; + } + + @Override + public void evict(CalculatedFieldId calculatedFieldId) { + CalculatedField oldCalculatedField = calculatedFields.remove(calculatedFieldId); + log.debug("[{}] evict calculated field from cache: {}", calculatedFieldId, oldCalculatedField); + calculatedFieldLinks.remove(calculatedFieldId); + log.debug("[{}] evict calculated field links from cache: {}", calculatedFieldId, oldCalculatedField); + calculatedFieldsCtx.remove(calculatedFieldId); + log.debug("[{}] evict calculated field ctx from cache: {}", calculatedFieldId, oldCalculatedField); + entityIdCalculatedFieldLinks.forEach((entityId, calculatedFieldLinks) -> calculatedFieldLinks.removeIf(link -> link.getCalculatedFieldId().equals(calculatedFieldId))); + log.debug("[{}] evict calculated field links from cached links by entity id: {}", calculatedFieldId, oldCalculatedField); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index e5c119bba6..ff78d2925a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -18,7 +18,11 @@ package org.thingsboard.server.service.cf; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Getter; @@ -37,8 +41,23 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.id.*; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.TbMsg; @@ -46,10 +65,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.cf.CalculatedFieldService; -import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -67,8 +84,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,7 +94,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -90,10 +106,9 @@ import static org.thingsboard.server.common.data.DataConstants.SCOPE; public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBasedService implements CalculatedFieldExecutionService { private final CalculatedFieldService calculatedFieldService; - private final AssetService assetService; - private final DeviceService deviceService; private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; + private final CalculatedFieldCache calculatedFieldCache; private final AttributesService attributesService; private final TimeseriesService timeseriesService; private final RocksDBService rocksDBService; @@ -103,13 +118,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private ListeningExecutorService calculatedFieldExecutor; private ListeningExecutorService calculatedFieldCallbackExecutor; - private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); - private final ConcurrentMap> calculatedFieldLinks = new ConcurrentHashMap<>(); - private final ConcurrentMap calculatedFieldsCtx = new ConcurrentHashMap<>(); private final ConcurrentMap states = new ConcurrentHashMap<>(); - private final ConcurrentMap> profileEntities = new ConcurrentHashMap<>(); - private static final int MAX_LAST_RECORDS_VALUE = 1024; @Value("${calculatedField.initFetchPackSize:50000}") @@ -123,20 +133,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field")); calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback")); - scheduledExecutor.submit(this::fetchCalculatedFields); - } - - private void fetchCalculatedFields() { - PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); - cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); - PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); - cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); - rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldEntityCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldEntityCtx.class))); - states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); } @PreDestroy public void stop() { + super.stop(); if (calculatedFieldExecutor != null) { calculatedFieldExecutor.shutdownNow(); } @@ -183,7 +184,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas for (CalculatedField cf : partition) { EntityId cfEntityId = cf.getEntityId(); if (isProfileEntity(cfEntityId)) { - getOrFetchFromDBProfileEntities(cf.getTenantId(), cfEntityId) + calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId) .forEach(entityId -> restoreState(cf, entityId)); } else { restoreState(cf, cfEntityId); @@ -206,7 +207,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (storedState != null) { CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class); - calculatedFieldsCtx.putIfAbsent(cf.getId(), new CalculatedFieldCtx(cf, tbelInvokeService)); states.put(ctxId, restoredCtx); log.info("Restored state for CalculatedField [{}]", cf.getId()); } else { @@ -220,7 +220,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void cleanupEntity(CalculatedFieldId calculatedFieldId) { - calculatedFields.remove(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); } @@ -235,7 +234,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas onCalculatedFieldDelete(tenantId, calculatedFieldId, callback); callback.onSuccess(); } - CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId); + CalculatedField cf = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); if (proto.getUpdated()) { log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId); boolean shouldReinit = onCalculatedFieldUpdate(cf, callback); @@ -245,8 +244,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } if (cf != null) { EntityId entityId = cf.getEntityId(); - CalculatedFieldCtx calculatedFieldCtx = new CalculatedFieldCtx(cf, tbelInvokeService); - calculatedFieldsCtx.put(calculatedFieldId, calculatedFieldCtx); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); switch (entityId.getEntityType()) { case ASSET, DEVICE -> { log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); @@ -258,7 +256,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .filter(entry -> !isProfileEntity(entry.getValue().getEntityId())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); fetchArguments(tenantId, entityId, commonArguments, commonArgs -> { - getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> { + calculatedFieldCache.getEntitiesByProfile(tenantId, entityId).forEach(targetEntityId -> { initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArgs, callback); }); }); @@ -290,8 +288,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } else if (EntityType.DEVICE.equals(entityType)) { profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); } - List cfLinks = new ArrayList<>(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId)); - Optional.ofNullable(profileId).ifPresent(id -> cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, id))); + List cfLinks = calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId); + Optional.ofNullable(profileId).ifPresent(id -> calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id)); cfLinks.forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); Map attributes = link.getConfiguration().getAttributes(); @@ -316,8 +314,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List calculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId); - CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId); - CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService)); + CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); Map argumentValues = updatedTelemetry.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); @@ -326,7 +324,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas case ASSET_PROFILE, DEVICE_PROFILE -> { boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId); if (isCommonEntity) { - getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds)); + calculatedFieldCache.getEntitiesByProfile(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds)); } else { updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, calculatedFieldIds); } @@ -353,28 +351,61 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return entry.getKey(); } + private Object deserializeObjectProto(TransportProtos.ObjectProto objectProto) { + try { + String type = objectProto.getType(); + String value = objectProto.getValue(); + return switch (type) { + case "java.lang.String" -> value; + case "java.lang.Integer" -> Integer.parseInt(value); + case "java.lang.Long" -> Long.parseLong(value); + case "java.lang.Double" -> Double.parseDouble(value); + case "java.lang.Boolean" -> Boolean.parseBoolean(value); + default -> throw new IllegalArgumentException("Unsupported object type: " + type); + }; + } catch (Exception e) { + log.error("Failed to deserialize ObjectProto: [{}]", objectProto, e); + return null; + } + } + @Override public void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback) { try { TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - String state = proto.getState(); - CalculatedFieldEntityCtx calculatedFieldEntityCtx = state.isEmpty() ? JacksonUtil.fromString(state, CalculatedFieldEntityCtx.class) : null; - - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); - if (tpi.isMyPartition()) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId()); - if (calculatedFieldEntityCtx != null) { - states.put(ctxId, calculatedFieldEntityCtx); - rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), state); - } else { - states.remove(ctxId); - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); - } - } else { - log.debug("[{}] Calculated Field belongs to external partition {}", calculatedFieldId, tpi.getFullTopicName()); + List calculatedFieldIds = new ArrayList<>(); + for (TransportProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { + CalculatedFieldId cfId = new CalculatedFieldId(new UUID( + cfIdProto.getCalculatedFieldIdMSB(), + cfIdProto.getCalculatedFieldIdLSB() + )); + calculatedFieldIds.add(cfId); } + + Map argumentsMap = new HashMap<>(); + proto.getArgumentsMap().forEach((key, entryProto) -> { + ArgumentEntry argumentEntry; + if (entryProto.hasTsRecords()) { + TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry(); + entryProto.getTsRecords().getTsRecordsMap().forEach((ts, objectProto) -> { + Object value = deserializeObjectProto(objectProto); + tsRollingArgumentEntry.getTsRecords().put(ts, value); + }); + argumentEntry = tsRollingArgumentEntry; + } else if (entryProto.hasSingleValue()) { + TransportProtos.SingleValueProto singleRecordProto = entryProto.getSingleValue(); + Object value = deserializeObjectProto(singleRecordProto.getValue()); + argumentEntry = new SingleValueArgumentEntry(singleRecordProto.getTs(), value); + } else { + throw new IllegalArgumentException("Unsupported ArgumentEntryProto type"); + } + argumentsMap.put(key, argumentEntry); + }); + + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, calculatedFieldIds); } catch (Exception e) { log.trace("Failed to process calculated field update state msg: [{}]", proto, e); } @@ -389,8 +420,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB())); log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); - profileEntities.get(oldProfileId).remove(entityId); - profileEntities.computeIfAbsent(newProfileId, id -> new HashSet<>()).add(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId); calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> { @@ -400,7 +431,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.remove(ctxId); rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null); + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null); } }); @@ -419,12 +450,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); if (proto.getDeleted()) { log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); - profileEntities.get(profileId).remove(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId); List calculatedFieldIds = Stream.concat( - calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId), - calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId) + calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId), + calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId) ).toList(); calculatedFieldIds.forEach(cfId -> { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); @@ -433,12 +462,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.remove(ctxId); rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null); + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null); } }); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); - profileEntities.computeIfAbsent(profileId, id -> new HashSet<>()).add(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId); initializeStateForEntityByProfile(tenantId, entityId, profileId, callback); } } catch (Exception e) { @@ -446,7 +475,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState calculatedFieldState) { + private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List calculatedFieldIds, Map argumentValues) { TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = TransportProtos.CalculatedFieldStateMsgProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) @@ -455,20 +484,45 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .setEntityType(entityId.getEntityType().name()) .setEntityIdMSB(entityId.getId().getMostSignificantBits()) .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - if (calculatedFieldState != null) { - msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState)); + + if (argumentValues != null) { + argumentValues.forEach((key, argumentEntry) -> { + TransportProtos.ArgumentEntryProto.Builder argumentEntryProtoBuilder = TransportProtos.ArgumentEntryProto.newBuilder(); + + if (argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { + TransportProtos.TsRollingProto.Builder tsRollingProtoBuilder = TransportProtos.TsRollingProto.newBuilder(); + + tsRollingArgumentEntry.getTsRecords().forEach((ts, value) -> { + TransportProtos.ObjectProto.Builder objectProtoBuilder = TransportProtos.ObjectProto.newBuilder() + .setType(value.getClass().getName()) + .setValue(value.toString()); + tsRollingProtoBuilder.putTsRecords(ts, objectProtoBuilder.build()); + }); + + argumentEntryProtoBuilder.setTsRecords(tsRollingProtoBuilder.build()); + } else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { + TransportProtos.SingleValueProto.Builder singleRecordProtoBuilder = TransportProtos.SingleValueProto.newBuilder() + .setTs(singleValueArgumentEntry.getTs()) + .setValue(TransportProtos.ObjectProto.newBuilder() + .setType(singleValueArgumentEntry.getValue().getClass().getName()) + .setValue(singleValueArgumentEntry.getValue().toString()) + .build()); + argumentEntryProtoBuilder.setSingleValue(singleRecordProtoBuilder.build()); + } + + msgBuilder.putArguments(key, argumentEntryProtoBuilder.build()); + }); } + clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); } private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { - CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); + CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); boolean shouldReinit = true; if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback); } else { - calculatedFields.put(updatedCalculatedField.getId(), updatedCalculatedField); - calculatedFieldsCtx.put(updatedCalculatedField.getId(), new CalculatedFieldCtx(updatedCalculatedField, tbelInvokeService)); callback.onSuccess(); shouldReinit = false; } @@ -483,8 +537,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (calculatedFieldIds != null) { calculatedFieldIds.remove(calculatedFieldId); } - calculatedFields.remove(calculatedFieldId); - calculatedFieldsCtx.remove(calculatedFieldId); + calculatedFieldCache.evict(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); List statesToRemove = states.keySet().stream() .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())) @@ -497,28 +550,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private CalculatedField getOrFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - return calculatedFields.computeIfAbsent(calculatedFieldId, cfId -> calculatedFieldService.findById(tenantId, calculatedFieldId)); - } - - private Set getOrFetchFromDBProfileEntities(TenantId tenantId, EntityId entityProfileId) { - return switch (entityProfileId.getEntityType()) { - case ASSET_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { - Set assetIds = new HashSet<>(); - (new PageDataIterable<>(pageLink -> - assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) profileId, pageLink), initFetchPackSize)).forEach(assetIds::add); - return assetIds; - }); - case DEVICE_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { - Set deviceIds = new HashSet<>(); - (new PageDataIterable<>(pageLink -> - deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityProfileId, pageLink), initFetchPackSize)).forEach(deviceIds::add); - return deviceIds; - }); - default -> throw new IllegalArgumentException("Entity type should be ASSET_PROFILE or DEVICE_PROFILE."); - }; - } - private boolean hasSignificantChanges(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) { if (oldCalculatedField == null) { return true; @@ -537,7 +568,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) { calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId) .stream() - .map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService))) + .map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(tenantId, cfId, tbelInvokeService)) .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); } @@ -562,7 +593,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { @Override public void onSuccess(List results) { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, Collections.emptyList()); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>()); callback.onSuccess(); } @@ -651,46 +682,47 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List calculatedFieldIds) { + TenantId tenantId = calculatedFieldCtx.getTenantId(); CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); - CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); + if (tpi.isMyPartition()) { + CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); + CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); - Predicate> allArgsPresent = (args) -> - args.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && - !args.containsValue(SingleValueArgumentEntry.EMPTY) && !args.containsValue(TsRollingArgumentEntry.EMPTY); - - Consumer performUpdateState = (state) -> { - if (state.updateState(argumentValues)) { - calculatedFieldEntityCtx.setState(state); - TenantId tenantId = calculatedFieldCtx.getTenantId(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); - if (tpi.isMyPartition()) { + Consumer performUpdateState = (state) -> { + if (state.updateState(argumentValues)) { + calculatedFieldEntityCtx.setState(state); states.put(entityCtxId, calculatedFieldEntityCtx); rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); - } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state); + Map arguments = state.getArguments(); + boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && + !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); + if (allArgsPresent) { + performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds); + } } + }; - if (allArgsPresent.test(state.getArguments())) { - performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds); - } + CalculatedFieldState state = calculatedFieldEntityCtx.getState(); + + boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); + if (!allKeysPresent) { + + Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() + .filter(entry -> !argumentValues.containsKey(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll) + .addListener(() -> performUpdateState.accept(state), + calculatedFieldCallbackExecutor); + return; } - }; - - CalculatedFieldState state = calculatedFieldEntityCtx.getState(); - boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); - if (!allKeysPresent) { - - Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() - .filter(entry -> !argumentValues.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll) - .addListener(() -> performUpdateState.accept(state), - calculatedFieldCallbackExecutor); - return; + performUpdateState.accept(state); + states.put(entityCtxId, calculatedFieldEntityCtx); + rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); + } else { + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentValues); } - performUpdateState.accept(state); } private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List calculatedFieldIds) { @@ -724,6 +756,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); + if (calculatedFieldIds == null) { + calculatedFieldIds = new ArrayList<>(); + } if (calculatedFieldIds.contains(calculatedFieldId)) { throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop."); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 993fa24ff3..0805175f77 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -24,6 +24,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -191,6 +192,16 @@ public final class TbMsg implements Serializable { builder.setPartition(msg.getPartition()); } + if (msg.getCalculatedFieldIds() != null) { + for (CalculatedFieldId calculatedFieldId : msg.getCalculatedFieldIds()) { + MsgProtos.CalculatedFieldIdProto calculatedFieldIdProto = MsgProtos.CalculatedFieldIdProto.newBuilder() + .setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits()) + .build(); + builder.addCalculatedFields(calculatedFieldIdProto); + } + } + builder.setCtx(msg.ctx.toProto()); return builder.build().toByteArray(); } @@ -495,8 +506,8 @@ public final class TbMsg implements Serializable { ", type=" + this.type + ", internalType=" + this.internalType + ", originator=" + this.originator + ", customerId=" + this.customerId + ", metaData=" + this.metaData + ", dataType=" + this.dataType + ", data=" + this.data + ", ruleChainId=" + this.ruleChainId + ", ruleNodeId=" + this.ruleNodeId + - ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", ctx=" + this.ctx + - ", callback=" + this.callback + ")"; + ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", calculatedFields=" + this.calculatedFieldIds + + ", ctx=" + this.ctx + ", callback=" + this.callback + ")"; } } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 3df19e313e..f6f7afd5d8 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -817,7 +817,34 @@ message CalculatedFieldStateMsgProto { string entityType = 5; int64 entityIdMSB = 6; int64 entityIdLSB = 7; - string state = 8; + repeated CalculatedFieldIdProto calculatedFields = 8; + map arguments = 9; +} + +message CalculatedFieldIdProto { + int64 calculatedFieldIdMSB = 1; + int64 calculatedFieldIdLSB = 2; +} + +message ArgumentEntryProto { + oneof entry_type { + TsRollingProto tsRecords = 1; + SingleValueProto singleValue = 2; + } +} + +message TsRollingProto { + map tsRecords = 1; +} + +message SingleValueProto { + int64 ts = 1; + ObjectProto value = 2; +} + +message ObjectProto { + string type = 1; + string value = 2; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index e83a125265..20d0dda42f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -125,6 +125,7 @@ public class TbMsgAttributesNode implements TbNode { .scope(scope) .entries(attributes) .notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY))) + .calculatedFieldIds(msg.getCalculatedFieldIds()) .callback(callback) .build()); }