diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 302e4b2511..966b5d7277 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -29,7 +29,9 @@ public interface CalculatedFieldExecutionService { void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List telemetry); - void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback); + void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback); + + void onEntityProfileChangedMsg(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback); void onProfileEntityMsg(TransportProtos.ProfileEntityMsgProto proto, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 7e5ea5b5ec..f2a09d0faf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -98,6 +98,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.thingsboard.server.common.data.DataConstants.SCOPE; @@ -184,7 +185,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas try { tpi = partitionService.resolve(ServiceType.TB_CORE, cf.getTenantId(), cf.getId()); } catch (Exception e) { - log.warn("Failed to resolve partition for CalculatedField [{}], tenant [{}]. Reason: {}", + log.warn("Failed to resolve partition for CalculatedField [{}], tenant id [{}]. Reason: {}", cf.getId(), cf.getTenantId(), e.getMessage()); continue; } @@ -233,8 +234,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) { - calculatedFields.remove(entityId); - states.keySet().removeIf(ctxId -> ctxId.cfId().equals(entityId.getId())); + cleanupEntity(entityId); + } + + private void cleanupEntity(CalculatedFieldId calculatedFieldId) { + calculatedFields.remove(calculatedFieldId); + states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); } @Override @@ -245,7 +250,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); if (proto.getDeleted()) { log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId); - onCalculatedFieldDelete(calculatedFieldId, callback); + onCalculatedFieldDelete(tenantId, calculatedFieldId, callback); callback.onSuccess(); } CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId); @@ -364,7 +369,34 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { + public void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback) { + try { + TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + String state = proto.getState(); + CalculatedFieldEntityCtx calculatedFieldEntityCtx = state.isEmpty() ? JacksonUtil.fromString(state, CalculatedFieldEntityCtx.class) : null; + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + if (tpi.isMyPartition()) { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId()); + if (calculatedFieldEntityCtx != null) { + states.put(ctxId, calculatedFieldEntityCtx); + rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), state); + } else { + states.remove(ctxId); + rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + } + } else { + log.debug("[{}] Calculated Field belongs to external partition {}", calculatedFieldId, tpi.getFullTopicName()); + } + } catch (Exception e) { + log.trace("Failed to process calculated field update state msg: [{}]", proto, e); + } + } + + @Override + public void onEntityProfileChangedMsg(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { try { TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); @@ -377,9 +409,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - states.remove(ctxId); - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); + if (tpi.isMyPartition()) { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); + states.remove(ctxId); + rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + } else { + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null); + } }); initializeStateForEntityByProfile(tenantId, entityId, newProfileId, callback); @@ -398,12 +435,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (proto.getDeleted()) { log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); profileEntities.get(profileId).remove(entityId); - List statesToRemove = states.keySet().stream() - .filter(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId())) - .map(JacksonUtil::writeValueAsString) - .toList(); - states.keySet().removeIf(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId())); - rocksDBService.deleteAll(statesToRemove); + List calculatedFieldIds = Stream.concat( + calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId).stream() + .map(CalculatedFieldLink::getCalculatedFieldId), + calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId).stream() + .map(CalculatedFieldLink::getCalculatedFieldId) + ).toList(); + calculatedFieldIds.forEach(cfId -> { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); + if (tpi.isMyPartition()) { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); + states.remove(ctxId); + rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + } else { + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null); + } + }); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); profileEntities.computeIfAbsent(profileId, id -> new HashSet<>()).add(entityId); @@ -414,11 +461,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState calculatedFieldState) { + TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = TransportProtos.CalculatedFieldStateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits()) + .setEntityType(entityId.getEntityType().name()) + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + if (calculatedFieldState != null) { + msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState)); + } + clusterService.pushMsgToCore(tenantId, entityId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); + } + private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); boolean shouldReinit = true; if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { - onCalculatedFieldDelete(updatedCalculatedField.getId(), callback); + onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback); } else { calculatedFields.put(updatedCalculatedField.getId(), updatedCalculatedField); calculatedFieldsCtx.put(updatedCalculatedField.getId(), new CalculatedFieldCtx(updatedCalculatedField, tbelInvokeService)); @@ -428,8 +490,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return shouldReinit; } - private void onCalculatedFieldDelete(CalculatedFieldId calculatedFieldId, TbCallback callback) { + private void onCalculatedFieldDelete(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbCallback callback) { try { + cleanupEntity(calculatedFieldId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + Set calculatedFieldIds = partitionedEntities.get(tpi); + if (calculatedFieldIds != null) { + calculatedFieldIds.remove(calculatedFieldId); + } calculatedFields.remove(calculatedFieldId); calculatedFieldsCtx.remove(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); @@ -606,7 +674,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues) { - CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(calculatedFieldCtx.getCfId().getId(), entityId.getId()); + CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); + CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, this::fetchCalculatedFieldEntityState); CalculatedFieldState state = calculatedFieldEntityCtx.getState(); @@ -616,8 +685,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } if (state.updateState(argumentValues)) { calculatedFieldEntityCtx.setState(state); - states.put(entityCtxId, calculatedFieldEntityCtx); - rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); + TenantId tenantId = calculatedFieldCtx.getTenantId(); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldCtx.getCfId()); + if (tpi.isMyPartition()) { + states.put(entityCtxId, calculatedFieldEntityCtx); + rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); + } else { + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state); + } boolean allArgsPresent = calculatedFieldCtx.getArguments().keySet().containsAll(state.getArguments().keySet()); if (allArgsPresent) { @@ -626,7 +701,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override public void onSuccess(CalculatedFieldResult result) { if (result != null) { - pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result); + pushMsgToRuleEngine(tenantId, entityId, result); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 6cf42cb893..a686184b8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -322,6 +322,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChanged(profileUpdateMsg, callback)); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChangedMsg(profileUpdateMsg, callback)); DonAsynchron.withCallback(future, __ -> callback.onSuccess(), t -> { @@ -708,6 +710,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldStateMsg(calculatedFieldStateMsgProto, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process calculated field state message for entityId [{}]", tenantId.getId(), calculatedFieldId.getId(), t); + callback.onFailure(t); + }); + } + private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) { TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB())); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 76da257b32..153394525d 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -807,6 +807,17 @@ message ProfileEntityMsgProto { bool deleted = 10; } +message CalculatedFieldStateMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 calculatedFieldIdMSB = 3; + int64 calculatedFieldIdLSB = 4; + string entityType = 5; + int64 entityIdMSB = 6; + int64 entityIdLSB = 7; + string state = 8; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -1541,6 +1552,7 @@ message ToCoreMsg { CalculatedFieldMsgProto calculatedFieldMsg = 53; EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54; ProfileEntityMsgProto profileEntityMsg = 55; + CalculatedFieldStateMsgProto calculatedFieldStateMsg = 56; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */