diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 978b2499a4..8f15a6766b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -314,6 +314,7 @@ public class ActorSystemContext { @Getter private TbEntityViewService tbEntityViewService; + @Lazy @Autowired @Getter private TelemetrySubscriptionService tsSubService; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java index c5f8ecf046..1dfe92c4bc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.calculatedField; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActor; import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.device.DeviceActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -38,7 +39,7 @@ public class CalculatedFieldEntityActorCreator extends ContextBasedCreator { @Override public TbActorId createActorId() { - return new TbEntityActorId(entityId); + return new TbCalculatedFieldEntityActorId(entityId); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 34608337a5..c219544a91 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -24,6 +24,7 @@ import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -50,7 +51,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -175,7 +175,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onEntityCreated(ComponentLifecycleMsg msg, TbCallback callback) { EntityId entityId = msg.getEntityId(); EntityId profileId = getProfileId(tenantId, entityId); - cfEntityCache.add(tenantId, entityId, profileId); + cfEntityCache.add(tenantId, profileId, entityId); var entityIdFields = getCalculatedFieldsByEntityId(entityId); var profileIdFields = getCalculatedFieldsByEntityId(profileId); var fieldsCount = entityIdFields.size() + profileIdFields.size(); @@ -233,6 +233,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); + addLinks(cf); initCf(cfCtx, callback, false); } } @@ -251,7 +252,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } else { var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); calculatedFields.put(newCf.getId(), newCfCtx); - List oldCfList = entityIdCalculatedFields.get(newCf.getId()); + List oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); List newCfList = new ArrayList<>(oldCfList.size()); boolean found = false; for (CalculatedFieldCtx oldCtx : oldCfList) { @@ -265,10 +266,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware if (!found) { newCfList.add(newCfCtx); } - entityIdCalculatedFields.put(newCf.getId(), newCfList); + entityIdCalculatedFields.put(newCf.getEntityId(), newCfList); + + deleteLinks(oldCfCtx); + addLinks(newCf); + // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) - if (newCfCtx.hasSignificantChanges(oldCfCtx)) { + var stateChanges = newCfCtx.hasStateChanges(oldCfCtx); + if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) { try { newCfCtx.init(); } catch (Exception e) { @@ -276,11 +282,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e); } } - initCf(newCfCtx, callback, true); + initCf(newCfCtx, callback, stateChanges); + } else { + callback.onSuccess(); } } } - } private void onCfDeleted(ComponentLifecycleMsg msg, TbCallback callback) { @@ -290,6 +297,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.warn("[{}] CF was already deleted [{}]", tenantId, cfId); callback.onSuccess(); } else { + deleteLinks(cfCtx); + EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); if (isProfileEntity(entityType)) { @@ -440,4 +449,16 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware () -> true); } + private void addLinks(CalculatedField newCf) { + var newLinks = newCf.getConfiguration().buildCalculatedFieldLinks(tenantId, newCf.getEntityId(), newCf.getId()); + newLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).add(link)); + } + + private void deleteLinks(CalculatedFieldCtx cfCtx) { + var oldCf = cfCtx.getCalculatedField(); + var oldLinks = oldCf.getConfiguration().buildCalculatedFieldLinks(tenantId, oldCf.getEntityId(), oldCf.getId()); + oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).remove(link)); + } + + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index c8f3d98f40..9688f35fef 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -70,20 +70,25 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @AfterStartUp(order = AfterStartUp.CF_READ_CF_SERVICE) public void init() { + //TODO: move to separate place to avoid circular references with the ActorSystemContext (@Lazy for tsSubService) PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); - cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); - calculatedFields.values().forEach(cf -> - entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf) - ); - cfs.forEach(cf -> actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf))); + cfs.forEach(cf -> { + calculatedFields.putIfAbsent(cf.getId(), cf); + actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf)); + }); + calculatedFields.values().forEach(cf -> { + entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf); + }); PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); - cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new CopyOnWriteArrayList<>()).add(link)); + cfls.forEach(link -> { + calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new CopyOnWriteArrayList<>()).add(link); + actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)); + }); calculatedFieldLinks.values().stream() .flatMap(List::stream) .forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link) ); - cfls.forEach(link -> actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link))); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index dd8c9b7edd..103997f0c7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -136,7 +136,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } }; - private final CalculatedFieldService calculatedFieldService; private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; private final CalculatedFieldCache calculatedFieldCache; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java index b3499f3cc4..6aa5c3d4de 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.cache; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import java.util.ArrayList; @@ -88,6 +89,9 @@ public class TenantEntityProfileCache { public void add(EntityId profileId, EntityId entityId, Integer partition, boolean mine) { lock.writeLock().lock(); try { + if(EntityType.DEVICE.equals(profileId.getEntityType())){ + throw new RuntimeException("WTF?"); + } if (mine) { myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index c483c6ab5d..c3105d6b57 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -212,12 +212,16 @@ public class CalculatedFieldCtx { return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); } - public boolean hasSignificantChanges(CalculatedFieldCtx other) { - boolean entityIdChanged = !entityId.equals(other.entityId); + public boolean hasOtherSignificantChanges(CalculatedFieldCtx other) { + boolean expressionChanged = !expression.equals(other.expression); + boolean outputChanged = !output.equals(other.output); + return expressionChanged || outputChanged; + } + + public boolean hasStateChanges(CalculatedFieldCtx other) { boolean typeChanged = !cfType.equals(other.cfType); boolean argumentsChanged = !arguments.equals(other.arguments); - boolean expressionChanged = !expression.equals(other.expression); - return entityIdChanged || typeChanged || argumentsChanged || expressionChanged; + return typeChanged || argumentsChanged; } } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index d40bf7b130..7265533aab 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -178,11 +178,11 @@ public class EntityStateSourcingListener { } case TENANT_PROFILE -> { TenantProfile tenantProfile = (TenantProfile) event.getEntity(); - tbClusterService.onTenantProfileDelete(tenantProfile, null); + tbClusterService.onTenantProfileDelete(tenantProfile, TbQueueCallback.EMPTY); } case DEVICE -> { Device device = (Device) event.getEntity(); - tbClusterService.onDeviceDeleted(tenantId, device, null); + tbClusterService.onDeviceDeleted(tenantId, device, TbQueueCallback.EMPTY); } case DEVICE_PROFILE -> { DeviceProfile deviceProfile = (DeviceProfile) event.getEntity(); @@ -190,11 +190,11 @@ public class EntityStateSourcingListener { } case TB_RESOURCE -> { TbResourceInfo tbResource = (TbResourceInfo) event.getEntity(); - tbClusterService.onResourceDeleted(tbResource, null); + tbClusterService.onResourceDeleted(tbResource, TbQueueCallback.EMPTY); } case CALCULATED_FIELD -> { CalculatedField calculatedField = (CalculatedField) event.getEntity(); - tbClusterService.onCalculatedFieldDeleted(calculatedField, null); + tbClusterService.onCalculatedFieldDeleted(calculatedField, TbQueueCallback.EMPTY); } default -> { } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 944e24480e..fa02435ca0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -727,14 +727,19 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, TbQueueCallback callback) { - var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback); + var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED)); + onCalculatedFieldLifecycleMsg(msg, callback); } @Override public void onCalculatedFieldDeleted(CalculatedField calculatedField, TbQueueCallback callback) { - var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback); + var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED)); + onCalculatedFieldLifecycleMsg(msg, callback); + } + + private void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto msg, TbQueueCallback callback) { + broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(msg).build(), callback); + broadcastToCore(ToCoreNotificationMsg.newBuilder().setComponentLifecycle(msg).build()); } @Override diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index ee4af20639..a2a26d55e4 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -159,7 +159,8 @@ public final class TbActorMailbox implements TbActorCtx { stopReason = TbActorStopReason.INIT_FAILED; destroy(updateException.getCause()); } catch (Throwable t) { - log.debug("[{}] Failed to process message: {}", selfId, msg, t); + //TODO: revert; + log.error("[{}] Failed to process message: {}", selfId, msg, t); ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t); if (strategy.isStop()) { system.stop(selfId);