Merge remote-tracking branch 'origin/cf-consumer-groups' into cf-fixes
This commit is contained in:
commit
6b7977b582
@ -107,7 +107,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
|
||||
public void process(CalculatedFieldStateRestoreMsg msg) {
|
||||
CalculatedFieldId cfId = msg.getId().cfId();
|
||||
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
|
||||
log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
|
||||
if (msg.getState() != null) {
|
||||
states.put(cfId, msg.getState());
|
||||
} else {
|
||||
@ -116,10 +116,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
}
|
||||
|
||||
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
|
||||
log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
|
||||
log.debug("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
|
||||
var ctx = msg.getCtx();
|
||||
if (msg.isForceReinit()) {
|
||||
log.info("Force reinitialization of CF: [{}].", ctx.getCfId());
|
||||
log.debug("Force reinitialization of CF: [{}].", ctx.getCfId());
|
||||
states.remove(ctx.getCfId());
|
||||
}
|
||||
try {
|
||||
@ -138,7 +138,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
}
|
||||
|
||||
public void process(CalculatedFieldEntityDeleteMsg msg) {
|
||||
log.info("[{}] Processing CF entity delete msg.", msg.getEntityId());
|
||||
log.debug("[{}] Processing CF entity delete msg.", msg.getEntityId());
|
||||
if (this.entityId.equals(msg.getEntityId())) {
|
||||
if (states.isEmpty()) {
|
||||
msg.getCallback().onSuccess();
|
||||
@ -244,7 +244,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
|
||||
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
|
||||
if (newArgValues.isEmpty()) {
|
||||
log.info("[{}] No new argument values to process for CF.", ctx.getCfId());
|
||||
log.debug("[{}] No new argument values to process for CF.", ctx.getCfId());
|
||||
callback.onSuccess(CALLBACKS_PER_CF);
|
||||
}
|
||||
CalculatedFieldState state = states.get(ctx.getCfId());
|
||||
|
||||
@ -236,12 +236,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
|
||||
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
|
||||
if (calculatedFields.containsKey(cfId)) {
|
||||
log.warn("[{}] CF was already initialized [{}]", tenantId, cfId);
|
||||
log.debug("[{}] CF was already initialized [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var cf = cfDaoService.findById(msg.getTenantId(), cfId);
|
||||
if (cf == null) {
|
||||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
|
||||
@ -268,7 +268,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
} else {
|
||||
var newCf = cfDaoService.findById(msg.getTenantId(), cfId);
|
||||
if (newCf == null) {
|
||||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
|
||||
@ -313,7 +313,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
|
||||
var cfCtx = calculatedFields.remove(cfId);
|
||||
if (cfCtx == null) {
|
||||
log.warn("[{}] CF was already deleted [{}]", tenantId, cfId);
|
||||
log.debug("[{}] CF was already deleted [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);
|
||||
|
||||
@ -30,7 +30,9 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
|
||||
@ -41,7 +43,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
|
||||
@Autowired
|
||||
private ActorSystemContext actorSystemContext;
|
||||
|
||||
protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService;
|
||||
protected Map<QueueKey, QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>>> stateServices = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
@ -72,22 +74,22 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
|
||||
|
||||
@Override
|
||||
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
|
||||
stateService.update(queueKey, partitions);
|
||||
stateServices.get(queueKey).update(queueKey, partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Set<TopicPartitionInfo> partitions) {
|
||||
stateService.delete(partitions);
|
||||
public void delete(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
|
||||
stateServices.get(queueKey).delete(partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TopicPartitionInfo> getPartitions() {
|
||||
return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
|
||||
public Set<TopicPartitionInfo> getPartitions(QueueKey queueKey) {
|
||||
return stateServices.get(queueKey).getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stateService.stop();
|
||||
public void stop(QueueKey queueKey) {
|
||||
stateServices.get(queueKey).stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -37,10 +37,10 @@ public interface CalculatedFieldStateService {
|
||||
|
||||
void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions);
|
||||
|
||||
void delete(Set<TopicPartitionInfo> partitions);
|
||||
void delete(QueueKey queueKey, Set<TopicPartitionInfo> partitions);
|
||||
|
||||
Set<TopicPartitionInfo> getPartitions();
|
||||
Set<TopicPartitionInfo> getPartitions(QueueKey queueKey);
|
||||
|
||||
void stop();
|
||||
void stop(QueueKey queueKey);
|
||||
|
||||
}
|
||||
|
||||
@ -68,7 +68,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
||||
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME);
|
||||
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
|
||||
PartitionedQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>> stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
|
||||
.queueKey(queueKey)
|
||||
.topic(partitionService.getTopic(queueKey))
|
||||
@ -97,16 +97,16 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
.scheduler(eventConsumer.getScheduler())
|
||||
.taskExecutor(eventConsumer.getTaskExecutor())
|
||||
.build();
|
||||
super.stateService = KafkaQueueStateService.<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>>builder()
|
||||
super.stateServices.put(queueKey, KafkaQueueStateService.<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>>builder()
|
||||
.eventConsumer(eventConsumer)
|
||||
.stateConsumer(stateConsumer)
|
||||
.build();
|
||||
.build());
|
||||
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
|
||||
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
|
||||
if (stateMsgProto == null) {
|
||||
putStateId(msg.getHeaders(), stateId);
|
||||
@ -148,8 +148,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
super.stop();
|
||||
public void stop(QueueKey queueKey) {
|
||||
super.stop(queueKey);
|
||||
stateProducer.stop();
|
||||
}
|
||||
|
||||
|
||||
@ -20,13 +20,15 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.common.state.DefaultQueueStateService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.queue.common.state.DefaultQueueStateService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
|
||||
import org.thingsboard.server.service.cf.CfRocksDb;
|
||||
@ -44,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
|
||||
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
||||
super.stateService = new DefaultQueueStateService<>(eventConsumer);
|
||||
super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -61,7 +63,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
|
||||
|
||||
@Override
|
||||
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
|
||||
if (stateService.getPartitions().isEmpty()) {
|
||||
if (stateServices.get(queueKey).getPartitions().isEmpty()) {
|
||||
cfRocksDb.forEach((key, value) -> {
|
||||
try {
|
||||
processRestoredState(CalculatedFieldStateProto.parseFrom(value));
|
||||
|
||||
@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.data.queue.QueueConfig;
|
||||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
|
||||
@ -37,6 +38,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
@ -80,9 +82,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
|
||||
private long packProcessingTimeout;
|
||||
|
||||
private final TbRuleEngineQueueFactory queueFactory;
|
||||
private final TenantService tenantService;
|
||||
private final CalculatedFieldStateService stateService;
|
||||
private final CalculatedFieldEntityProfileCache entityProfileCache;
|
||||
|
||||
private final ConcurrentMap<QueueKey, PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>>> consumers = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
|
||||
ActorSystemContext actorContext,
|
||||
TbDeviceProfileCache deviceProfileCache,
|
||||
@ -94,29 +100,41 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
|
||||
JwtSettingsService jwtSettingsService,
|
||||
CalculatedFieldCache calculatedFieldCache,
|
||||
CalculatedFieldStateService stateService,
|
||||
CalculatedFieldEntityProfileCache entityProfileCache) {
|
||||
CalculatedFieldEntityProfileCache entityProfileCache,
|
||||
TenantService tenantService) {
|
||||
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
|
||||
eventPublisher, jwtSettingsService);
|
||||
this.queueFactory = tbQueueFactory;
|
||||
this.stateService = stateService;
|
||||
this.entityProfileCache = entityProfileCache;
|
||||
this.tenantService = tenantService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onStartUp() {
|
||||
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
|
||||
PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
|
||||
PageDataIterable<TenantId> iterator = new PageDataIterable<>(tenantService::findTenantsIds, 1024);
|
||||
for (TenantId tenantId : iterator) {
|
||||
if (partitionService.isManagedByCurrentService(tenantId)) {
|
||||
stateService.init(createConsumer(tenantId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> createConsumer(TenantId tenantId) {
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
|
||||
var eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
|
||||
.queueKey(queueKey)
|
||||
.topic(partitionService.getTopic(queueKey))
|
||||
.pollInterval(pollInterval)
|
||||
.msgPackProcessor(this::processMsgs)
|
||||
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
|
||||
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(tenantId, partitionId))
|
||||
.queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
|
||||
.consumerExecutor(consumersExecutor)
|
||||
.scheduler(scheduler)
|
||||
.taskExecutor(mgmtExecutor)
|
||||
.build();
|
||||
stateService.init(eventConsumer);
|
||||
consumers.put(queueKey, eventConsumer);
|
||||
return eventConsumer;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -232,13 +250,24 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
|
||||
if (event.getEntityId().getEntityType() == EntityType.TENANT) {
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
entityProfileCache.removeTenant(event.getTenantId());
|
||||
Set<TopicPartitionInfo> partitions = stateService.getPartitions();
|
||||
if (CollectionUtils.isEmpty(partitions)) {
|
||||
return;
|
||||
consumers.keySet().removeIf(queueKey -> {
|
||||
boolean toRemove = queueKey.getTenantId().equals(event.getTenantId());
|
||||
if (toRemove) {
|
||||
Set<TopicPartitionInfo> partitions = stateService.getPartitions(queueKey);
|
||||
if (!CollectionUtils.isEmpty(partitions)) {
|
||||
stateService.delete(queueKey, partitions);
|
||||
}
|
||||
var consumer = consumers.get(queueKey);
|
||||
if (consumer != null) {
|
||||
consumer.stop();
|
||||
}
|
||||
}
|
||||
return toRemove;
|
||||
});
|
||||
} else if (event.getEvent() == ComponentLifecycleEvent.CREATED) {
|
||||
if (partitionService.isManagedByCurrentService(event.getTenantId())) {
|
||||
stateService.init(createConsumer(event.getTenantId()));
|
||||
}
|
||||
stateService.delete(partitions.stream()
|
||||
.filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId()))
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
} else if (event.getEntityId().getEntityType() == EntityType.ASSET_PROFILE) {
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
@ -271,7 +300,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
|
||||
@Override
|
||||
protected void stopConsumers() {
|
||||
super.stopConsumers();
|
||||
stateService.stop(); // eventConsumer will be stopped by stateService
|
||||
consumers.keySet().forEach(stateService::stop); // eventConsumer will be stopped by stateService
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
@ -133,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer() {
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
|
||||
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
@ -103,7 +104,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
||||
private final TbQueueAdmin edgeAdmin;
|
||||
private final TbQueueAdmin edgeEventAdmin;
|
||||
private final TbQueueAdmin cfAdmin;
|
||||
private final TbKafkaAdmin cfAdmin;
|
||||
private final TbQueueAdmin cfStateAdmin;
|
||||
private final TbQueueAdmin edqsEventsAdmin;
|
||||
private final TbKafkaAdmin edqsRequestsAdmin;
|
||||
@ -513,15 +514,22 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer() {
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
|
||||
String queueName = DataConstants.CF_QUEUE_NAME;
|
||||
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);
|
||||
|
||||
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId
|
||||
groupId, partitionId);
|
||||
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
|
||||
consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer"));
|
||||
consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(groupId);
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
consumerBuilder.admin(cfAdmin);
|
||||
consumerBuilder.statsService(consumerStatsService);
|
||||
|
||||
return consumerBuilder.build();
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,8 @@ import jakarta.annotation.PreDestroy;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
@ -90,7 +92,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
||||
private final TbQueueAdmin housekeeperAdmin;
|
||||
private final TbQueueAdmin edgeAdmin;
|
||||
private final TbQueueAdmin edgeEventAdmin;
|
||||
private final TbQueueAdmin cfAdmin;
|
||||
private final TbKafkaAdmin cfAdmin;
|
||||
private final TbQueueAdmin cfStateAdmin;
|
||||
private final TbQueueAdmin edqsEventsAdmin;
|
||||
private final AtomicLong consumerCount = new AtomicLong();
|
||||
@ -313,12 +315,18 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer() {
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
|
||||
String queueName = DataConstants.CF_QUEUE_NAME;
|
||||
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);
|
||||
|
||||
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId
|
||||
groupId, partitionId);
|
||||
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
|
||||
consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer"));
|
||||
consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(groupId);
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
consumerBuilder.admin(cfAdmin);
|
||||
consumerBuilder.statsService(consumerStatsService);
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.queue.provider;
|
||||
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
||||
@ -121,7 +122,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
|
||||
|
||||
TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate();
|
||||
|
||||
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer();
|
||||
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId);
|
||||
|
||||
TbQueueAdmin getCalculatedFieldQueueAdmin();
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user