Improvements for tenant actors init with dedicated rule engines

This commit is contained in:
ViacheslavKlimov 2024-02-13 11:07:34 +02:00
parent ba5c59f9d5
commit 492dc5916e
10 changed files with 101 additions and 56 deletions

View File

@ -202,8 +202,7 @@ public class AppActor extends ContextAwareActor {
return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId),
() -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) ||
systemContext.getPartitionService().isManagedByCurrentService(tenantId)));
() -> true));
}
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {

View File

@ -161,6 +161,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
@Override
public void onPartitionChangeMsg(PartitionChangeMsg msg) {
log.debug("[{}][{}] onPartitionChangeMsg: [{}]", tenantId, entityId, msg);
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
}

View File

@ -50,6 +50,8 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
@Getter
protected TbActorRef rootChainActor;
protected boolean ruleChainsInitialized;
public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.tenantId = tenantId;
@ -57,6 +59,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
}
protected void initRuleChains() {
ruleChainsInitialized = true;
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
@ -70,6 +73,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
ctx.stop(new TbEntityActorId(ruleChain.getId()));
}
ruleChainsInitialized = false;
}
protected void visit(RuleChain entity, TbActorRef actorRef) {

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.server.actors.ActorSystemContext;
@ -39,6 +40,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
private final String ruleChainName;
@ -61,6 +63,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void start(TbActorCtx context) throws Exception {
if (isMyNodePartition()) {
log.debug("[{}][{}] Starting", tenantId, entityId);
tbNode = initComponent(ruleNode);
if (tbNode != null) {
state = ComponentLifecycleState.ACTIVE;
@ -95,6 +98,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void stop(TbActorCtx context) {
log.debug("[{}][{}] Stopping", tenantId, entityId);
if (tbNode != null) {
tbNode.destroy();
state = ComponentLifecycleState.SUSPENDED;
@ -103,6 +107,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void onPartitionChangeMsg(PartitionChangeMsg msg) throws Exception {
log.debug("[{}][{}] onPartitionChangeMsg: [{}]", tenantId, entityId, msg);
if (tbNode != null) {
if (!isMyNodePartition()) {
stop(null);
@ -185,9 +190,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
private boolean isMyNodePartition(RuleNode ruleNode) {
return ruleNode == null || !ruleNode.isSingletonMode()
boolean result = ruleNode == null || !ruleNode.isSingletonMode()
|| systemContext.getDiscoveryService().isMonolith()
|| defaultCtx.isLocalEntity(ruleNode.getId());
log.trace("[{}][{}] Is not my node partition", tenantId, entityId);
return result;
}
//Message will return after processing. See RuleChainActorMessageProcessor.pushToTarget.

View File

@ -102,6 +102,8 @@ public class TenantActor extends RuleChainManagerActor {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
}
} else {
log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);
}
}
log.debug("[{}] Tenant actor started.", tenantId);
@ -131,20 +133,7 @@ public class TenantActor extends RuleChainManagerActor {
}
switch (msg.getMsgType()) {
case PARTITION_CHANGE_MSG:
PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;
ServiceType serviceType = partitionChangeMsg.getServiceType();
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
//To Rule Chain Actors
broadcast(msg);
} else if (ServiceType.TB_CORE.equals(serviceType)) {
List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
@Override
protected boolean testEntityId(EntityId entityId) {
return super.testEntityId(entityId) && !isMyPartition(entityId);
}
});
deviceActorIds.forEach(id -> ctx.stop(id));
}
onPartitionChangeMsg((PartitionChangeMsg) msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@ -239,6 +228,35 @@ public class TenantActor extends RuleChainManagerActor {
}
}
private void onPartitionChangeMsg(PartitionChangeMsg msg) {
ServiceType serviceType = msg.getServiceType();
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
if (!ruleChainsInitialized) {
log.info("Tenant {} is already managed by this service, initializing rule chains", tenantId);
initRuleChains();
}
} else {
if (ruleChainsInitialized) {
log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId);
destroyRuleChains();
}
return;
}
//To Rule Chain Actors
broadcast(msg);
} else if (ServiceType.TB_CORE.equals(serviceType)) {
List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
@Override
protected boolean testEntityId(EntityId entityId) {
return super.testEntityId(entityId) && !isMyPartition(entityId);
}
});
deviceActorIds.forEach(id -> ctx.stop(id));
}
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
if (msg.getEntityId().getEntityType().equals(EntityType.API_USAGE_STATE)) {
ApiUsageState old = getApiUsageState();

View File

@ -99,23 +99,26 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
initConsumer(configuration);
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration);
getOrCreateConsumer(queueKey).init(configuration);
}
}
}
private void initConsumer(Queue configuration) {
getOrCreateConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration)).init(configuration);
}
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
event.getPartitionsMap().forEach((queueKey, partitions) -> {
var consumer = consumers.get(queueKey);
if (consumer != null) {
consumer.update(partitions);
} else {
log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey);
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
getOrCreateConsumer(queueKey).update(partitions);
}
});
consumers.keySet().stream()
.collect(Collectors.groupingBy(QueueKey::getTenantId))
.forEach((tenantId, queueKeys) -> {
if (!partitionService.isManagedByCurrentService(tenantId)) {
queueKeys.forEach(queueKey -> {
removeConsumer(queueKey).ifPresent(TbRuleEngineQueueConsumerManager::stop);
});
}
});
}
@ -211,10 +214,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
log.info("Received queue delete msg: [{}]", queueDeleteMsg);
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
var consumerManager = consumers.remove(queueKey);
if (consumerManager != null) {
consumerManager.delete(true);
}
removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(true));
}
partitionService.removeQueues(queueDeleteMsgs);
@ -229,10 +229,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
.filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId()))
.collect(Collectors.toList());
toRemove.forEach(queueKey -> {
var consumerManager = consumers.remove(queueKey);
if (consumerManager != null) {
consumerManager.delete(false);
}
removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(false));
});
}
}
@ -242,6 +239,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key));
}
private Optional<TbRuleEngineQueueConsumerManager> removeConsumer(QueueKey queueKey) {
return Optional.ofNullable(consumers.remove(queueKey));
}
@Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
public void printStats() {
if (ctx.isStatsEnabled()) {

View File

@ -156,7 +156,7 @@ public class HashPartitionServiceTest {
for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "queue" + queueIndex, tenantId);
for (int partition = 0; partition < partitionCount; partition++) {
ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition);
ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition, Collections.emptyMap());
String serviceId = serviceInfo.getServiceId();
map.put(serviceId, map.get(serviceId) + 1);
}
@ -389,9 +389,9 @@ public class HashPartitionServiceTest {
.limit(100).collect(Collectors.toList());
for (int partition = 0; partition < 10; partition++) {
ServiceInfo expectedAssignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId), partition);
ServiceInfo expectedAssignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId), partition, Collections.emptyMap());
for (QueueKey queueKey : queues) {
ServiceInfo assignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, queueKey, partition);
ServiceInfo assignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, queueKey, partition, Collections.emptyMap());
assertThat(assignedRuleEngine).as(queueKey + "[" + partition + "] should be assigned to " + expectedAssignedRuleEngine.getServiceId())
.isEqualTo(expectedAssignedRuleEngine);
}

View File

@ -81,7 +81,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
}
log.info("Current Service ID: {}", serviceId);
if (serviceType.equalsIgnoreCase("monolith")) {
serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
serviceTypes = List.of(ServiceType.values());
} else {
serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -81,7 +82,7 @@ public class HashPartitionService implements PartitionService {
private List<ServiceInfo> currentOtherServices;
private final Map<String, List<ServiceInfo>> tbTransportServicesByType = new HashMap<>();
private final Map<TenantProfileId, List<ServiceInfo>> responsibleServices = new HashMap<>();
private volatile Map<TenantProfileId, List<ServiceInfo>> responsibleServices = Collections.emptyMap();
private HashFunction hashFunction;
@ -218,16 +219,26 @@ public class HashPartitionService implements PartitionService {
@Override
public boolean isManagedByCurrentService(TenantId tenantId) {
Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
if (assignedTenantProfiles.isEmpty()) {
// TODO: refactor this for common servers
if (serviceInfoProvider.isService(ServiceType.TB_CORE) || !serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
return true;
}
Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
if (assignedTenantProfiles.isEmpty()) { // if this is regular rule engine
if (tenantId.isSysTenantId()) {
return true;
}
TenantRoutingInfo routingInfo = getRoutingInfo(tenantId);
if (routingInfo.isIsolated()) {
return CollectionsUtil.isEmpty(responsibleServices.get(routingInfo.getProfileId()));
} else {
return true;
}
} else {
if (tenantId.isSysTenantId()) {
return false;
}
TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId();
return assignedTenantProfiles.contains(profileId.getId());
return assignedTenantProfiles.contains(getRoutingInfo(tenantId).getProfileId().getId());
}
}
@ -283,14 +294,14 @@ public class HashPartitionService implements PartitionService {
public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
log.info("Recalculating partitions");
tbTransportServicesByType.clear();
responsibleServices.clear();
logServiceInfo(currentService);
otherServices.forEach(this::logServiceInfo);
Map<QueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
addNode(queueServicesMap, currentService);
Map<TenantProfileId, List<ServiceInfo>> responsibleServices = new HashMap<>();
addNode(currentService, queueServicesMap, responsibleServices);
for (ServiceInfo other : otherServices) {
addNode(queueServicesMap, other);
addNode(other, queueServicesMap, responsibleServices);
}
queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
responsibleServices.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
@ -299,7 +310,7 @@ public class HashPartitionService implements PartitionService {
partitionSizesMap.forEach((queueKey, size) -> {
for (int i = 0; i < size; i++) {
try {
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i);
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i, responsibleServices);
log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none");
if (currentService.equals(serviceInfo)) {
newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
@ -309,6 +320,7 @@ public class HashPartitionService implements PartitionService {
}
}
});
this.responsibleServices = responsibleServices;
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
myPartitions = newPartitions;
@ -474,20 +486,22 @@ public class HashPartitionService implements PartitionService {
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
return false;
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> {
return tenantRoutingInfoService.getRoutingInfo(tenantId);
});
TenantRoutingInfo routingInfo = getRoutingInfo(tenantId);
if (routingInfo == null) {
throw new TenantNotFoundException(tenantId);
}
switch (serviceType) {
case TB_RULE_ENGINE:
return routingInfo.isIsolatedTbRuleEngine();
return routingInfo.isIsolated();
default:
return false;
}
}
private TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
return tenantRoutingInfoMap.computeIfAbsent(tenantId, tenantRoutingInfoService::getRoutingInfo);
}
private TenantId getIsolatedOrSystemTenantId(ServiceType serviceType, TenantId tenantId) {
return isIsolated(serviceType, tenantId) ? tenantId : TenantId.SYS_TENANT_ID;
}
@ -496,7 +510,7 @@ public class HashPartitionService implements PartitionService {
log.info("[{}] Found common server: {}", server.getServiceId(), server.getServiceTypesList());
}
private void addNode(Map<QueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) {
private void addNode(ServiceInfo instance, Map<QueueKey, List<ServiceInfo>> queueServiceList, Map<TenantProfileId, List<ServiceInfo>> responsibleServices) {
for (String serviceTypeStr : instance.getServiceTypesList()) {
ServiceType serviceType = ServiceType.of(serviceTypeStr);
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
@ -528,7 +542,8 @@ public class HashPartitionService implements PartitionService {
}
}
protected ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, QueueKey queueKey, int partition) {
protected ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, QueueKey queueKey, int partition,
Map<TenantProfileId, List<ServiceInfo>> responsibleServices) {
if (servers == null || servers.isEmpty()) {
return null;
}

View File

@ -23,5 +23,5 @@ import org.thingsboard.server.common.data.id.TenantProfileId;
public class TenantRoutingInfo {
private final TenantId tenantId;
private final TenantProfileId profileId;
private final boolean isolatedTbRuleEngine;
private final boolean isolated;
}