Merge pull request #12915 from irynamatveieva/calculated-fields

CF: fixed cluster issues
This commit is contained in:
Viacheslav Klimov 2025-03-13 15:34:55 +02:00 committed by GitHub
commit 883a1f919a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 38 additions and 14 deletions

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
@ -35,6 +36,7 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
@ -121,7 +123,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
if (calculatedField != null) {
msg.getState().setRequiredArguments(calculatedField.getArgNames());
log.info("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
getOrCreateActor(msg.getId().entityId()).tell(msg);
} else {
cfStateService.removeState(msg.getId(), msg.getCallback());
@ -178,6 +180,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId entityId = msg.getEntityId();
EntityId profileId = getProfileId(tenantId, entityId);
cfEntityCache.add(tenantId, profileId, entityId);
if (!isMyPartition(entityId, callback)) {
return;
}
var entityIdFields = getCalculatedFieldsByEntityId(entityId);
var profileIdFields = getCalculatedFieldsByEntityId(profileId);
var fieldsCount = entityIdFields.size() + profileIdFields.size();
@ -193,6 +198,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) {
if (msg.getOldProfileId() != null && msg.getOldProfileId() != msg.getProfileId()) {
cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId());
if (!isMyPartition(msg.getEntityId(), callback)) {
return;
}
var oldProfileCfs = getCalculatedFieldsByEntityId(msg.getOldProfileId());
var newProfileCfs = getCalculatedFieldsByEntityId(msg.getProfileId());
var fieldsCount = oldProfileCfs.size() + newProfileCfs.size();
@ -209,8 +217,10 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void onEntityDeleted(ComponentLifecycleMsg msg, TbCallback callback) {
cfEntityCache.evict(tenantId, msg.getEntityId());
log.info("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId());
getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback));
if (isMyPartition(msg.getEntityId(), callback)) {
log.debug("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId());
getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback));
}
}
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
@ -311,7 +321,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
callback.onSuccess();
}
} else {
deleteCfForEntity(entityId, cfId, callback);
if (isMyPartition(entityId, callback)) {
deleteCfForEntity(entityId, cfId, callback);
}
}
}
}
@ -418,20 +430,31 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
callback.onSuccess();
}
} else {
initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
if (isMyPartition(entityId, callback)) {
initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
}
}
}
private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) {
log.info("Pushing delete CF msg to specific actor [{}]", entityId);
log.debug("Pushing delete CF msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback));
}
private void initCfForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, boolean forceStateReinit, TbCallback callback) {
log.info("Pushing entity init CF msg to specific actor [{}]", entityId);
log.debug("Pushing entity init CF msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new EntityInitCalculatedFieldMsg(tenantId, cfCtx, callback, forceStateReinit));
}
private boolean isMyPartition(EntityId entityId, TbCallback callback) {
if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) {
log.debug("[{}] Entity belongs to external partition.", entityId);
callback.onSuccess();
return false;
}
return true;
}
private static boolean isProfileEntity(EntityType entityType) {
return EntityType.DEVICE_PROFILE.equals(entityType) || EntityType.ASSET_PROFILE.equals(entityType);
}

View File

@ -112,7 +112,7 @@ public class TenantActor extends RuleChainManagerActor {
cantFindTenant = true;
}
} else {
log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);
log.info("Tenant {} is not managed by current service, skipping rule chains and cf actor init", tenantId);
}
}
log.debug("[{}] Tenant actor started.", tenantId);

View File

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.queue;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
@ -35,7 +32,7 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@ -60,7 +57,6 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -220,7 +216,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCalculatedFieldNotificationMsg> msg, TbCallback callback) {
ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue();
if (toCfNotification.hasComponentLifecycleMsg()) {
// from upstream (maybe removed since we don't need to init state for each partition)
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycleMsg()));
log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfNotification.getComponentLifecycleMsg());
forwardToActorSystem(toCfNotification.getComponentLifecycleMsg(), callback);
} else if (toCfNotification.hasLinkedTelemetryMsg()) {

View File

@ -38,6 +38,7 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@ -143,6 +144,7 @@ import org.thingsboard.server.dao.device.ClaimDevicesService;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.service.cf.CfRocksDb;
import org.thingsboard.server.service.entitiy.tenant.profile.TbTenantProfileService;
import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest;
import org.thingsboard.server.service.security.auth.rest.LoginRequest;
@ -276,6 +278,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
@Autowired
protected InMemoryStorage storage;
@MockBean
protected CfRocksDb cfRocksDb;
@Rule
public TestRule watcher = new TestWatcher() {
protected void starting(Description description) {