CF Cluster mode fixes
This commit is contained in:
parent
4bf6b631a6
commit
3d282836e7
@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
|
|||||||
import org.thingsboard.server.actors.TbActorCtx;
|
import org.thingsboard.server.actors.TbActorCtx;
|
||||||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
|
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
|
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
|
||||||
@ -34,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
|
|||||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
|
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
|
||||||
@ -74,7 +76,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
final EntityId entityId;
|
final EntityId entityId;
|
||||||
final CalculatedFieldProcessingService cfService;
|
final CalculatedFieldProcessingService cfService;
|
||||||
final CalculatedFieldStateService cfStateService;
|
final CalculatedFieldStateService cfStateService;
|
||||||
final int partition;
|
|
||||||
|
|
||||||
TbActorCtx ctx;
|
TbActorCtx ctx;
|
||||||
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
|
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
|
||||||
@ -85,7 +86,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
this.entityId = entityId;
|
this.entityId = entityId;
|
||||||
this.cfService = systemContext.getCalculatedFieldProcessingService();
|
this.cfService = systemContext.getCalculatedFieldProcessingService();
|
||||||
this.cfStateService = systemContext.getCalculatedFieldStateService();
|
this.cfStateService = systemContext.getCalculatedFieldStateService();
|
||||||
this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void init(TbActorCtx ctx) {
|
void init(TbActorCtx ctx) {
|
||||||
@ -93,8 +93,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void process(CalculatedFieldPartitionChangeMsg msg) {
|
public void process(CalculatedFieldPartitionChangeMsg msg) {
|
||||||
if (!msg.getPartitions()[partition]) {
|
if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) {
|
||||||
log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId);
|
log.info("[{}] Stopping entity actor due to change partition event.", entityId);
|
||||||
ctx.stop(ctx.getSelf());
|
ctx.stop(ctx.getSelf());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -98,7 +98,7 @@ public class TenantActor extends RuleChainManagerActor {
|
|||||||
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
|
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
|
||||||
() -> true);
|
() -> true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.info("Failed to init CF Actor.", e);
|
log.info("[{}] Failed to init CF Actor.", tenantId, e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (getApiUsageState().isReExecEnabled()) {
|
if (getApiUsageState().isReExecEnabled()) {
|
||||||
@ -259,11 +259,25 @@ public class TenantActor extends RuleChainManagerActor {
|
|||||||
ServiceType serviceType = msg.getServiceType();
|
ServiceType serviceType = msg.getServiceType();
|
||||||
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
|
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
|
||||||
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
|
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
|
||||||
|
if (cfActor == null) {
|
||||||
|
try {
|
||||||
|
//TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
|
||||||
|
cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
|
||||||
|
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
|
||||||
|
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
|
||||||
|
() -> true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("[{}] Failed to init CF Actor.", tenantId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!ruleChainsInitialized) {
|
if (!ruleChainsInitialized) {
|
||||||
log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId);
|
log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId);
|
||||||
initRuleChains();
|
initRuleChains();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (cfActor != null) {
|
||||||
|
ctx.stop(cfActor.getActorId());
|
||||||
|
}
|
||||||
if (ruleChainsInitialized) {
|
if (ruleChainsInitialized) {
|
||||||
log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId);
|
log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId);
|
||||||
destroyRuleChains();
|
destroyRuleChains();
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
||||||
import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
|
import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||||
@ -51,6 +52,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
|
|||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
@ -200,7 +202,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
|||||||
if (broadcast) {
|
if (broadcast) {
|
||||||
broadcasts.add(link);
|
broadcasts.add(link);
|
||||||
} else {
|
} else {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, link.tenantId(), link.entityId());
|
||||||
unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link);
|
unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,8 +18,10 @@ package org.thingsboard.server.service.cf.cache;
|
|||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
@ -57,7 +59,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
|
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
|
||||||
var tpi = partitionService.resolve(QueueKey.CF, entityId);
|
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||||
var partition = tpi.getPartition().orElse(UNKNOWN);
|
var partition = tpi.getPartition().orElse(UNKNOWN);
|
||||||
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
|
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
|
||||||
.add(profileId, entityId, partition, tpi.isMyPartition());
|
.add(profileId, entityId, partition, tpi.isMyPartition());
|
||||||
@ -65,7 +67,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
|
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
|
||||||
var tpi = partitionService.resolve(QueueKey.CF, entityId);
|
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||||
var partition = tpi.getPartition().orElse(UNKNOWN);
|
var partition = tpi.getPartition().orElse(UNKNOWN);
|
||||||
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
|
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
|
||||||
//TODO: make this method atomic;
|
//TODO: make this method atomic;
|
||||||
@ -86,7 +88,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) {
|
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) {
|
||||||
var tpi = partitionService.resolve(QueueKey.CF, entityId);
|
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||||
return tpi.getPartition().orElse(UNKNOWN);
|
return tpi.getPartition().orElse(UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -20,10 +20,12 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
||||||
@ -67,9 +69,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
|||||||
@Override
|
@Override
|
||||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
||||||
super.init(eventConsumer);
|
super.init(eventConsumer);
|
||||||
|
|
||||||
|
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME);
|
||||||
this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
|
this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
|
||||||
.queueKey(QueueKey.CF_STATES)
|
.queueKey(queueKey)
|
||||||
.topic(partitionService.getTopic(QueueKey.CF_STATES))
|
.topic(partitionService.getTopic(queueKey))
|
||||||
.pollInterval(pollInterval)
|
.pollInterval(pollInterval)
|
||||||
.msgPackProcessor((msgs, consumer, config) -> {
|
.msgPackProcessor((msgs, consumer, config) -> {
|
||||||
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
|
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
|
||||||
@ -101,7 +105,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
|
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
|
||||||
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
|
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
|
||||||
if (stateMsgProto == null) {
|
if (stateMsgProto == null) {
|
||||||
putStateId(msg.getHeaders(), stateId);
|
putStateId(msg.getHeaders(), stateId);
|
||||||
|
|||||||
@ -27,6 +27,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
|
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
|
||||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.queue.QueueConfig;
|
import org.thingsboard.server.common.data.queue.QueueConfig;
|
||||||
@ -108,9 +109,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
|||||||
public void init() {
|
public void init() {
|
||||||
super.init("tb-cf");
|
super.init("tb-cf");
|
||||||
|
|
||||||
|
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
|
||||||
this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
|
this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
|
||||||
.queueKey(QueueKey.CF)
|
.queueKey(queueKey)
|
||||||
.topic(partitionService.getTopic(QueueKey.CF))
|
.topic(partitionService.getTopic(queueKey))
|
||||||
.pollInterval(pollInterval)
|
.pollInterval(pollInterval)
|
||||||
.msgPackProcessor(this::processMsgs)
|
.msgPackProcessor(this::processMsgs)
|
||||||
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
|
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
|
||||||
@ -140,7 +142,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
|||||||
|
|
||||||
// Cleanup old entities after corresponding consumers are stopped.
|
// Cleanup old entities after corresponding consumers are stopped.
|
||||||
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
|
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
|
||||||
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
|
actorContext.tell(new CalculatedFieldPartitionChangeMsg());
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.error("Failed to process partition change event: {}", event, t);
|
log.error("Failed to process partition change event: {}", event, t);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
|
public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||||
pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback);
|
pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
|
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||||
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
|
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
|
||||||
toRuleEngineNfs.incrementAndGet();
|
toRuleEngineNfs.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import org.springframework.context.event.EventListener;
|
|||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.id.QueueId;
|
import org.thingsboard.server.common.data.id.QueueId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
@ -108,7 +109,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
@Override
|
@Override
|
||||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||||
event.getNewPartitions().forEach((queueKey, partitions) -> {
|
event.getNewPartitions().forEach((queueKey, partitions) -> {
|
||||||
if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) {
|
if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
|
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
|
||||||
|
|||||||
@ -26,8 +26,6 @@ import java.util.Set;
|
|||||||
@Data
|
@Data
|
||||||
public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg {
|
public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg {
|
||||||
|
|
||||||
private final boolean[] partitions;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TenantId getTenantId() {
|
public TenantId getTenantId() {
|
||||||
return TenantId.SYS_TENANT_ID;
|
return TenantId.SYS_TENANT_ID;
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import org.jetbrains.annotations.NotNull;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
|
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
@ -122,11 +123,6 @@ public class HashPartitionService implements PartitionService {
|
|||||||
partitionSizesMap.put(coreKey, corePartitions);
|
partitionSizesMap.put(coreKey, corePartitions);
|
||||||
partitionTopicsMap.put(coreKey, coreTopic);
|
partitionTopicsMap.put(coreKey, coreTopic);
|
||||||
|
|
||||||
partitionSizesMap.put(QueueKey.CF, cfPartitions);
|
|
||||||
partitionTopicsMap.put(QueueKey.CF, cfEventTopic);
|
|
||||||
partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions);
|
|
||||||
partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic);
|
|
||||||
|
|
||||||
QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
|
QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
|
||||||
partitionSizesMap.put(vcKey, vcPartitions);
|
partitionSizesMap.put(vcKey, vcPartitions);
|
||||||
partitionTopicsMap.put(vcKey, vcTopic);
|
partitionTopicsMap.put(vcKey, vcTopic);
|
||||||
@ -165,6 +161,14 @@ public class HashPartitionService implements PartitionService {
|
|||||||
List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos();
|
List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos();
|
||||||
queueRoutingInfoList.forEach(queue -> {
|
queueRoutingInfoList.forEach(queue -> {
|
||||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
|
||||||
|
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
|
||||||
|
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
|
||||||
|
partitionSizesMap.put(cfQueueKey, queue.getPartitions());
|
||||||
|
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
|
||||||
|
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
|
||||||
|
partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions());
|
||||||
|
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
|
||||||
|
}
|
||||||
partitionTopicsMap.put(queueKey, queue.getQueueTopic());
|
partitionTopicsMap.put(queueKey, queue.getQueueTopic());
|
||||||
partitionSizesMap.put(queueKey, queue.getPartitions());
|
partitionSizesMap.put(queueKey, queue.getPartitions());
|
||||||
queueConfigs.put(queueKey, new QueueConfig(queue));
|
queueConfigs.put(queueKey, new QueueConfig(queue));
|
||||||
@ -213,6 +217,14 @@ public class HashPartitionService implements PartitionService {
|
|||||||
QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg);
|
QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg);
|
||||||
TenantId tenantId = queueRoutingInfo.getTenantId();
|
TenantId tenantId = queueRoutingInfo.getTenantId();
|
||||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId);
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId);
|
||||||
|
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
|
||||||
|
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
|
||||||
|
partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions());
|
||||||
|
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
|
||||||
|
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
|
||||||
|
partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions());
|
||||||
|
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
|
||||||
|
}
|
||||||
partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic());
|
partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic());
|
||||||
partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions());
|
partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions());
|
||||||
queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo));
|
queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo));
|
||||||
@ -252,6 +264,15 @@ public class HashPartitionService implements PartitionService {
|
|||||||
partitionTopicsMap.remove(queueKey);
|
partitionTopicsMap.remove(queueKey);
|
||||||
partitionSizesMap.remove(queueKey);
|
partitionSizesMap.remove(queueKey);
|
||||||
queueConfigs.remove(queueKey);
|
queueConfigs.remove(queueKey);
|
||||||
|
|
||||||
|
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
|
||||||
|
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
|
||||||
|
partitionSizesMap.remove(cfQueueKey);
|
||||||
|
partitionTopicsMap.remove(cfQueueKey);
|
||||||
|
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
|
||||||
|
partitionSizesMap.remove(cfQueueStatesKey);
|
||||||
|
partitionTopicsMap.remove(cfQueueStatesKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -336,8 +357,7 @@ public class HashPartitionService implements PartitionService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
|
||||||
public TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
|
|
||||||
Integer partitionSize = partitionSizesMap.get(queueKey);
|
Integer partitionSize = partitionSizesMap.get(queueKey);
|
||||||
if (partitionSize == null) {
|
if (partitionSize == null) {
|
||||||
throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing");
|
throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing");
|
||||||
|
|||||||
@ -37,8 +37,6 @@ public interface PartitionService {
|
|||||||
|
|
||||||
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId);
|
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId);
|
|
||||||
|
|
||||||
List<TopicPartitionInfo> resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId);
|
List<TopicPartitionInfo> resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId);
|
boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId);
|
||||||
|
|||||||
@ -35,9 +35,6 @@ public class QueueKey {
|
|||||||
private final String queueName;
|
private final String queueName;
|
||||||
private final TenantId tenantId;
|
private final TenantId tenantId;
|
||||||
|
|
||||||
public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME);
|
|
||||||
public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME);
|
|
||||||
|
|
||||||
public QueueKey(ServiceType type, Queue queue) {
|
public QueueKey(ServiceType type, Queue queue) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
this.queueName = queue.getName();
|
this.queueName = queue.getName();
|
||||||
|
|||||||
@ -64,10 +64,10 @@ public class PartitionChangeEvent extends TbApplicationEvent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Set<TopicPartitionInfo> getCfPartitions() {
|
public Set<TopicPartitionInfo> getCfPartitions() {
|
||||||
return newPartitions.getOrDefault(QueueKey.CF, Collections.emptySet());
|
return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) {
|
public Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) {
|
||||||
return newPartitions.entrySet()
|
return newPartitions.entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName()))
|
.filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName()))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user