diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java index aa25565a34..7394c95f08 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -28,20 +28,24 @@ import java.util.Set; public interface CalculatedFieldCache { - CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); + CalculatedField getCalculatedField(CalculatedFieldId calculatedFieldId); - List getCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId); + List getCalculatedFieldsByEntityId(EntityId entityId); - List getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId); + List getCalculatedFieldLinks(CalculatedFieldId calculatedFieldId); - List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); + List getCalculatedFieldLinksByEntityId(EntityId entityId); - void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId); + void updateCalculatedFieldLinks(CalculatedFieldId calculatedFieldId); - CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService); + CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService); Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); + void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); + + void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); + void evict(CalculatedFieldId calculatedFieldId); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 40d569b040..71fc3eff67 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -70,98 +71,44 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { public void init() { PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); + calculatedFields.values().forEach(cf -> + entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new ArrayList<>()).add(cf) + ); PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); + calculatedFieldLinks.values().stream() + .flatMap(List::stream) + .forEach(link -> + entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).add(link) + ); } @Override - public CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - CalculatedField calculatedField = calculatedFields.get(calculatedFieldId); - if (calculatedField == null) { - calculatedFieldFetchLock.lock(); - try { - calculatedField = calculatedFields.get(calculatedFieldId); - if (calculatedField == null) { - calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); - if (calculatedField != null) { - calculatedFields.put(calculatedFieldId, calculatedField); - log.debug("[{}] Fetch calculated field into cache: {}", calculatedFieldId, calculatedField); - } - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - log.trace("[{}] Found calculated field in cache: {}", calculatedFieldId, calculatedField); - return calculatedField; + public CalculatedField getCalculatedField(CalculatedFieldId calculatedFieldId) { + return calculatedFields.get(calculatedFieldId); } @Override - public List getCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId) { - List cfs = entityIdCalculatedFields.get(entityId); - if (cfs == null) { - calculatedFieldFetchLock.lock(); - try { - cfs = entityIdCalculatedFields.get(entityId); - if (cfs == null) { - cfs = calculatedFieldService.findCalculatedFieldsByEntityId(tenantId, entityId); - entityIdCalculatedFields.put(entityId, cfs); - log.debug("[{}] Fetch calculated fields by entity into cache: {}", entityId, cfs); - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - log.trace("[{}] Found calculated fields by entity in cache: {}", entityId, cfs); - return cfs; + public List getCalculatedFieldsByEntityId(EntityId entityId) { + return entityIdCalculatedFields.getOrDefault(entityId, new ArrayList<>()); } @Override - public List getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - List cfLinks = calculatedFieldLinks.get(calculatedFieldId); - if (cfLinks == null) { - calculatedFieldFetchLock.lock(); - try { - cfLinks = calculatedFieldLinks.get(calculatedFieldId); - if (cfLinks == null) { - cfLinks = calculatedFieldService.findAllCalculatedFieldLinksById(tenantId, calculatedFieldId); - calculatedFieldLinks.put(calculatedFieldId, cfLinks); - log.debug("[{}] Fetch calculated field links into cache: {}", calculatedFieldId, cfLinks); - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - log.trace("[{}] Found calculated field links in cache: {}", calculatedFieldId, cfLinks); - return cfLinks; + public List getCalculatedFieldLinks(CalculatedFieldId calculatedFieldId) { + return calculatedFieldLinks.getOrDefault(calculatedFieldId, new ArrayList<>()); } @Override - public List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) { - List cfLinks = entityIdCalculatedFieldLinks.get(entityId); - if (cfLinks == null) { - calculatedFieldFetchLock.lock(); - try { - cfLinks = entityIdCalculatedFieldLinks.get(entityId); - if (cfLinks == null) { - cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId); - entityIdCalculatedFieldLinks.put(entityId, cfLinks); - log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks); - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - log.trace("[{}] Found calculated field links by entity id in cache: {}", entityId, cfLinks); - return cfLinks; + public List getCalculatedFieldLinksByEntityId(EntityId entityId) { + return entityIdCalculatedFieldLinks.getOrDefault(entityId, new ArrayList<>()); } @Override - public void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + public void updateCalculatedFieldLinks(CalculatedFieldId calculatedFieldId) { log.debug("Update calculated field links per entity for calculated field: [{}]", calculatedFieldId); calculatedFieldFetchLock.lock(); try { - List cfLinks = getCalculatedFieldLinks(tenantId, calculatedFieldId); + List cfLinks = getCalculatedFieldLinks(calculatedFieldId); if (cfLinks != null && !cfLinks.isEmpty()) { cfLinks.forEach(link -> { entityIdCalculatedFieldLinks.compute(link.getEntityId(), (id, existingList) -> { @@ -181,14 +128,14 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { } @Override - public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) { + public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) { CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { calculatedFieldFetchLock.lock(); try { ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { - CalculatedField calculatedField = getCalculatedField(tenantId, calculatedFieldId); + CalculatedField calculatedField = getCalculatedField(calculatedFieldId); if (calculatedField != null) { ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService); calculatedFieldsCtx.put(calculatedFieldId, ctx); @@ -236,6 +183,42 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return entities; } + @Override + public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + calculatedFieldFetchLock.lock(); + try { + CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); + EntityId cfEntityId = calculatedField.getEntityId(); + + calculatedFields.put(calculatedFieldId, calculatedField); + + entityIdCalculatedFields.computeIfAbsent(cfEntityId, entityId -> new ArrayList<>()).add(calculatedField); + + CalculatedFieldConfiguration configuration = calculatedField.getConfiguration(); + calculatedFieldLinks.put(calculatedFieldId, configuration.buildCalculatedFieldLinks(tenantId, cfEntityId, calculatedFieldId)); + + configuration.getReferencedEntities().stream() + .filter(referencedEntityId -> !referencedEntityId.equals(cfEntityId)) + .forEach(referencedEntityId -> { + entityIdCalculatedFieldLinks.computeIfAbsent(referencedEntityId, entityId -> new ArrayList<>()) + .add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId)); + }); + } finally { + calculatedFieldFetchLock.unlock(); + } + } + + @Override + public void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + calculatedFieldFetchLock.lock(); + try { + evict(calculatedFieldId); + addCalculatedField(tenantId, calculatedFieldId); + } finally { + calculatedFieldFetchLock.unlock(); + } + } + @Override public void evict(CalculatedFieldId calculatedFieldId) { CalculatedField oldCalculatedField = calculatedFields.remove(calculatedFieldId); @@ -243,7 +226,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { calculatedFieldLinks.remove(calculatedFieldId); log.debug("[{}] evict calculated field from cached calculated fields by entity id: {}", calculatedFieldId, oldCalculatedField); entityIdCalculatedFields.forEach((entityId, calculatedFields) -> calculatedFields.removeIf(cf -> cf.getId().equals(calculatedFieldId))); - entityIdCalculatedFields.remove(oldCalculatedField.getEntityId()); log.debug("[{}] evict calculated field links from cache: {}", calculatedFieldId, oldCalculatedField); calculatedFieldsCtx.remove(calculatedFieldId); log.debug("[{}] evict calculated field ctx from cache: {}", calculatedFieldId, oldCalculatedField); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 0de3136439..dd11c799c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -92,6 +92,7 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -253,7 +254,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas onCalculatedFieldDelete(calculatedFieldId, callback); callback.onSuccess(); } - CalculatedField cf = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); + CalculatedField cf = calculatedFieldCache.getCalculatedField(calculatedFieldId); if (proto.getUpdated()) { log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId); boolean shouldReinit = onCalculatedFieldUpdate(cf, callback); @@ -263,7 +264,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } if (cf != null) { EntityId entityId = cf.getEntityId(); - CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService); switch (entityId.getEntityType()) { case ASSET, DEVICE -> { log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); @@ -297,7 +298,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { - CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); + CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getId()); boolean shouldReinit = true; if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { onCalculatedFieldDelete(updatedCalculatedField.getId(), callback); @@ -345,17 +346,27 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (supportedReferencedEntities.contains(entityId.getEntityType())) { TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId(); - Map> tpiStatesToUpdate = new HashMap<>(); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); - updateTelemetryForEntity(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate); - updateTelemetryForProfile(calculatedFieldTelemetryUpdateRequest, getProfileId(tenantId, entityId), tpiStatesToUpdate); - updateTelemetryForLinkedEntities(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate); + if (tpi.isMyPartition()) { - if (!tpiStatesToUpdate.isEmpty()) { - tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> { - TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest, ctxIds); - clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder().setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null); - }); + processCalculatedFields(calculatedFieldTelemetryUpdateRequest, entityId); + processCalculatedFields(calculatedFieldTelemetryUpdateRequest, getProfileId(tenantId, entityId)); + + Map> tpiStatesToUpdate = new HashMap<>(); + processCalculatedFieldLinks(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate); + if (!tpiStatesToUpdate.isEmpty()) { + tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> { + TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest, ctxIds); + clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder() + .setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null); + }); + } + } else { + TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest); + clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder() + .setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null); + // Forward this request to a correct server based on entity id. } } } catch (Exception e) { @@ -363,30 +374,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private void updateTelemetryForEntity(CalculatedFieldTelemetryUpdateRequest request, Map> tpiStates) { - updateTelemetryForEntity(request, request.getEntityId(), tpiStates); - } - - private void updateTelemetryForProfile(CalculatedFieldTelemetryUpdateRequest request, EntityId profileId, Map> tpiStates) { - updateTelemetryForEntity(request, profileId, tpiStates); - } - - private void updateTelemetryForEntity(CalculatedFieldTelemetryUpdateRequest request, EntityId targetEntity, Map> tpiStates) { + private void processCalculatedFields(CalculatedFieldTelemetryUpdateRequest request, EntityId cfTargetEntityId) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); - if (tpi.isMyPartition()) { - if (targetEntity != null) { - calculatedFieldCache.getCalculatedFieldsByEntityId(tenantId, targetEntity).forEach(cf -> { - CalculatedFieldLinkConfiguration linkConfiguration = cf.getConfiguration().getReferencedEntityConfig(targetEntity); - mapAndProcessUpdatedTelemetry(tenantId, entityId, cf.getId(), request, linkConfiguration); - }); - } - } else { - List ctxIds = tpiStates.computeIfAbsent(tpi, k -> new ArrayList<>()); - calculatedFieldCache.getCalculatedFieldsByEntityId(tenantId, targetEntity).forEach(cf -> { - ctxIds.add(new CalculatedFieldEntityCtxId(cf.getId(), entityId)); + if (cfTargetEntityId != null) { + calculatedFieldCache.getCalculatedFieldsByEntityId(cfTargetEntityId).forEach(cf -> { + CalculatedFieldLinkConfiguration linkConfiguration = cf.getConfiguration().getReferencedEntityConfig(cfTargetEntityId); + mapAndProcessUpdatedTelemetry(tenantId, entityId, cf.getId(), request, linkConfiguration); }); } } @@ -405,14 +400,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private void updateTelemetryForLinkedEntities(CalculatedFieldTelemetryUpdateRequest request, Map> tpiStates) { + private void processCalculatedFieldLinks(CalculatedFieldTelemetryUpdateRequest request, Map> tpiStates) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); - calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId) + calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId) .forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); - EntityId targetEntityId = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId).getEntityId(); + EntityId targetEntityId = calculatedFieldCache.getCalculatedField(calculatedFieldId).getEntityId(); if (isProfileEntity(targetEntityId)) { calculatedFieldCache.getEntitiesByProfile(tenantId, targetEntityId).forEach(entityByProfile -> { @@ -451,33 +446,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) { try { - TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + CalculatedFieldTelemetryUpdateRequest request = fromProto(proto); + + if (proto.getLinksList().isEmpty()) { + onTelemetryUpdate(request); + return; + } proto.getLinksList().forEach(ctxIdProto -> { - EntityId entityId = EntityIdFactory.getByTypeAndUuid( - ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); + TenantId tenantId = request.getTenantId(); + EntityId entityId = request.getEntityId(); + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); - List updatedTelemetry = proto.getUpdatedTelemetryList().stream() - .map(ProtoUtils::fromTelemetryProto) - .toList(); + CalculatedFieldLinkConfiguration linkConfiguration + = calculatedFieldCache.getCalculatedField(calculatedFieldId).getConfiguration().getReferencedEntityConfig(entityId); - boolean attributesUpdated = StringUtils.isEmpty(proto.getScope()); - - CalculatedFieldTelemetryUpdateRequest request = attributesUpdated - ? new CalculatedFieldAttributeUpdateRequest( - tenantId, entityId, AttributeScope.valueOf(proto.getScope()), updatedTelemetry, - proto.getPreviousCalculatedFieldsList().stream() - .map(cfIdProto -> new CalculatedFieldId( - new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) - .toList()) - : new CalculatedFieldTimeSeriesUpdateRequest( - tenantId, entityId, updatedTelemetry, - proto.getPreviousCalculatedFieldsList().stream() - .map(cfIdProto -> new CalculatedFieldId( - new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) - .toList()); - - onTelemetryUpdate(request); + mapAndProcessUpdatedTelemetry(tenantId, entityId, calculatedFieldId, request, linkConfiguration); }); } catch (Exception e) { log.trace("Failed to process telemetry update msg: [{}]", proto, e); @@ -486,8 +470,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List previousCalculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId); - CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); - CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); + CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(calculatedFieldId); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService); Map argumentValues = updatedTelemetry.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); @@ -524,7 +508,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Map argumentsMap = proto.getArgumentsMap().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue()))); - CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService); updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, previousCalculatedFieldIds); } catch (Exception e) { log.trace("Failed to process calculated field update state msg: [{}]", proto, e); @@ -559,7 +543,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (proto.getDeleted()) { log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); - getCalculatedFieldLinks(tenantId, entityId, profileId) + getCalculatedFieldLinks(entityId, profileId) .forEach(link -> clearState(tenantId, link.getCalculatedFieldId(), entityId)); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); @@ -585,7 +569,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) { calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId) .stream() - .map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(tenantId, cfId, tbelInvokeService)) + .map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(cfId, tbelInvokeService)) .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); } @@ -722,10 +706,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private List getCalculatedFieldLinks(TenantId tenantId, EntityId entityId, EntityId profileId) { - List links = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)); + private List getCalculatedFieldLinks(EntityId entityId, EntityId profileId) { + List links = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId)); if (profileId != null) { - links.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId)); + links.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(profileId)); } return links; } @@ -870,13 +854,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private TransportProtos.TelemetryUpdateMsgProto buildTelemetryUpdateMsgProto(CalculatedFieldTelemetryUpdateRequest request) { + return buildTelemetryUpdateMsgProto(request, Collections.emptyList()); + } + + ; + private TransportProtos.TelemetryUpdateMsgProto buildTelemetryUpdateMsgProto( CalculatedFieldTelemetryUpdateRequest request, List links ) { TransportProtos.TelemetryUpdateMsgProto.Builder builder = TransportProtos.TelemetryUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(request.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(request.getTenantId().getId().getLeastSignificantBits()); + .setTenantIdLSB(request.getTenantId().getId().getLeastSignificantBits()) + .setEntityType(request.getEntityId().getEntityType().name()) + .setEntityIdMSB(request.getEntityId().getId().getMostSignificantBits()) + .setEntityIdLSB(request.getEntityId().getId().getLeastSignificantBits()); for (CalculatedFieldEntityCtxId link : links) { builder.addLinks(toProto(link)); @@ -904,6 +897,31 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return builder.build(); } + private CalculatedFieldTelemetryUpdateRequest fromProto(TransportProtos.TelemetryUpdateMsgProto proto) { + TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + + List updatedTelemetry = proto.getUpdatedTelemetryList().stream() + .map(ProtoUtils::fromTelemetryProto) + .toList(); + + boolean attributesUpdated = StringUtils.isEmpty(proto.getScope()); + + return attributesUpdated + ? new CalculatedFieldAttributeUpdateRequest( + tenantId, entityId, AttributeScope.valueOf(proto.getScope()), updatedTelemetry, + proto.getPreviousCalculatedFieldsList().stream() + .map(cfIdProto -> new CalculatedFieldId( + new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) + .toList()) + : new CalculatedFieldTimeSeriesUpdateRequest( + tenantId, entityId, updatedTelemetry, + proto.getPreviousCalculatedFieldsList().stream() + .map(cfIdProto -> new CalculatedFieldId( + new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) + .toList()); + } + private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder() .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 2aaed13ec1..dac35bfc5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -194,7 +194,9 @@ public abstract class AbstractConsumerService buildCalculatedFieldLinks(TenantId tenantId, EntityId cfEntityId, CalculatedFieldId calculatedFieldId) { + return getReferencedEntities().stream() + .filter(referencedEntity -> !referencedEntity.equals(cfEntityId)) + .map(referencedEntityId -> buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId)) + .collect(Collectors.toList()); + } + + @Override + public CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId) { + CalculatedFieldLink link = new CalculatedFieldLink(); + link.setTenantId(tenantId); + link.setEntityId(referencedEntityId); + link.setCalculatedFieldId(calculatedFieldId); + link.setConfiguration(getReferencedEntityConfig(referencedEntityId)); + return link; + } + @Override public JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId) { ObjectNode configNode = mapper.createObjectNode(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java index 5c428bd628..ac94ade134 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java @@ -20,9 +20,12 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.JsonNode; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration; import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import java.util.List; import java.util.Map; @@ -57,4 +60,8 @@ public interface CalculatedFieldConfiguration { @JsonIgnore JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId); + List buildCalculatedFieldLinks(TenantId tenantId, EntityId cfEntityId, CalculatedFieldId calculatedFieldId); + + CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId); + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 685ca47719..8a7c2d8c03 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -824,10 +824,13 @@ message ProfileEntityMsgProto { message TelemetryUpdateMsgProto { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; - repeated CalculatedFieldEntityCtxIdProto links = 3; - repeated CalculatedFieldIdProto previousCalculatedFields = 4; - string scope = 5; - repeated TelemetryProto updatedTelemetry = 6; + string entityType = 3; + int64 entityIdMSB = 4; + int64 entityIdLSB = 5; + repeated CalculatedFieldEntityCtxIdProto links = 6; + repeated CalculatedFieldIdProto previousCalculatedFields = 7; + string scope = 8; + repeated TelemetryProto updatedTelemetry = 9; } message CalculatedFieldEntityCtxIdProto { diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index 36bc3d038a..9c81d91f64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -38,7 +38,6 @@ import org.thingsboard.server.dao.service.DataValidator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateId; import static org.thingsboard.server.dao.service.Validator.validatePageLink; @@ -240,23 +239,8 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements } private void createOrUpdateCalculatedFieldLink(TenantId tenantId, CalculatedField calculatedField) { - List links = buildCalculatedFieldLinks(tenantId, calculatedField); + List links = calculatedField.getConfiguration().buildCalculatedFieldLinks(tenantId, calculatedField.getEntityId(), calculatedField.getId()); links.forEach(link -> saveCalculatedFieldLink(tenantId, link)); } - private List buildCalculatedFieldLinks(TenantId tenantId, CalculatedField calculatedField) { - CalculatedFieldConfiguration cfConfig = calculatedField.getConfiguration(); - return cfConfig.getReferencedEntities().stream() - .filter(referencedEntity -> !referencedEntity.equals(calculatedField.getEntityId())) - .map(referencedEntityId -> { - CalculatedFieldLink link = new CalculatedFieldLink(); - link.setTenantId(tenantId); - link.setEntityId(referencedEntityId); - link.setCalculatedFieldId(calculatedField.getId()); - link.setConfiguration(cfConfig.getReferencedEntityConfig(referencedEntityId)); - return link; - }) - .collect(Collectors.toList()); - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java index 816fa1546c..bed6f2d3a2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.sql.cf; import org.springframework.data.jpa.repository.JpaRepository; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.dao.model.sql.CalculatedFieldEntity; @@ -29,7 +28,7 @@ public interface CalculatedFieldRepository extends JpaRepository findCalculatedFieldIdsByTenantIdAndEntityId(UUID tenantId, UUID entityId); - List findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId); + List findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId); List findAllByTenantId(UUID tenantId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java index 20081299e8..cdcffdd440 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java @@ -57,7 +57,7 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao findCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId) { - return calculatedFieldRepository.findAllByTenantIdAndEntityId(tenantId.getId(), entityId.getId()); + return DaoUtil.convertDataList(calculatedFieldRepository.findAllByTenantIdAndEntityId(tenantId.getId(), entityId.getId())); } @Override