diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index b0185fd555..2d182510e9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -22,6 +22,7 @@ import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; @@ -35,6 +36,7 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; @@ -121,7 +123,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware if (calculatedField != null) { msg.getState().setRequiredArguments(calculatedField.getArgNames()); - log.info("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); + log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); getOrCreateActor(msg.getId().entityId()).tell(msg); } else { cfStateService.removeState(msg.getId(), msg.getCallback()); @@ -178,6 +180,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware EntityId entityId = msg.getEntityId(); EntityId profileId = getProfileId(tenantId, entityId); cfEntityCache.add(tenantId, profileId, entityId); + if (!isMyPartition(entityId, callback)) { + return; + } var entityIdFields = getCalculatedFieldsByEntityId(entityId); var profileIdFields = getCalculatedFieldsByEntityId(profileId); var fieldsCount = entityIdFields.size() + profileIdFields.size(); @@ -193,6 +198,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) { if (msg.getOldProfileId() != null && msg.getOldProfileId() != msg.getProfileId()) { cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId()); + if (!isMyPartition(msg.getEntityId(), callback)) { + return; + } var oldProfileCfs = getCalculatedFieldsByEntityId(msg.getOldProfileId()); var newProfileCfs = getCalculatedFieldsByEntityId(msg.getProfileId()); var fieldsCount = oldProfileCfs.size() + newProfileCfs.size(); @@ -209,8 +217,10 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onEntityDeleted(ComponentLifecycleMsg msg, TbCallback callback) { cfEntityCache.evict(tenantId, msg.getEntityId()); - log.info("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId()); - getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback)); + if (isMyPartition(msg.getEntityId(), callback)) { + log.debug("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId()); + getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback)); + } } private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { @@ -311,7 +321,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware callback.onSuccess(); } } else { - deleteCfForEntity(entityId, cfId, callback); + if (isMyPartition(entityId, callback)) { + deleteCfForEntity(entityId, cfId, callback); + } } } } @@ -418,20 +430,31 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware callback.onSuccess(); } } else { - initCfForEntity(entityId, cfCtx, forceStateReinit, callback); + if (isMyPartition(entityId, callback)) { + initCfForEntity(entityId, cfCtx, forceStateReinit, callback); + } } } private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { - log.info("Pushing delete CF msg to specific actor [{}]", entityId); + log.debug("Pushing delete CF msg to specific actor [{}]", entityId); getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback)); } private void initCfForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, boolean forceStateReinit, TbCallback callback) { - log.info("Pushing entity init CF msg to specific actor [{}]", entityId); + log.debug("Pushing entity init CF msg to specific actor [{}]", entityId); getOrCreateActor(entityId).tell(new EntityInitCalculatedFieldMsg(tenantId, cfCtx, callback, forceStateReinit)); } + private boolean isMyPartition(EntityId entityId, TbCallback callback) { + if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { + log.debug("[{}] Entity belongs to external partition.", entityId); + callback.onSuccess(); + return false; + } + return true; + } + private static boolean isProfileEntity(EntityType entityType) { return EntityType.DEVICE_PROFILE.equals(entityType) || EntityType.ASSET_PROFILE.equals(entityType); } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index f9eec324fc..c1a0980623 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -112,7 +112,7 @@ public class TenantActor extends RuleChainManagerActor { cantFindTenant = true; } } else { - log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId); + log.info("Tenant {} is not managed by current service, skipping rule chains and cf actor init", tenantId); } } log.debug("[{}] Tenant actor started.", tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index ad248cc3d4..3232a9b89f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.queue; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; @@ -35,7 +32,7 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -60,7 +57,6 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -220,7 +216,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); if (toCfNotification.hasComponentLifecycleMsg()) { - // from upstream (maybe removed since we don't need to init state for each partition) + handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycleMsg())); log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfNotification.getComponentLifecycleMsg()); forwardToActorSystem(toCfNotification.getComponentLifecycleMsg(), callback); } else if (toCfNotification.hasLinkedTelemetryMsg()) { diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index e90aecb41e..33f2209eca 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -38,6 +38,7 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -143,6 +144,7 @@ import org.thingsboard.server.dao.device.ClaimDevicesService; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.memory.InMemoryStorage; +import org.thingsboard.server.service.cf.CfRocksDb; import org.thingsboard.server.service.entitiy.tenant.profile.TbTenantProfileService; import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest; import org.thingsboard.server.service.security.auth.rest.LoginRequest; @@ -276,6 +278,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { @Autowired protected InMemoryStorage storage; + @MockBean + protected CfRocksDb cfRocksDb; + @Rule public TestRule watcher = new TestWatcher() { protected void starting(Description description) {