Fix partition change events handling by consumer services

This commit is contained in:
ViacheslavKlimov 2025-04-10 17:41:19 +03:00
parent e8eef3a7df
commit 8567dd0bd2
6 changed files with 21 additions and 15 deletions

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.queue;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
@ -70,7 +69,6 @@ import java.util.stream.Collectors;
@Service
@TbRuleEngineComponent
@Slf4j
public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
@Value("${queue.calculated_fields.poll_interval:25}")

View File

@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
@ -118,7 +117,6 @@ import java.util.stream.Collectors;
@Service
@TbCoreComponent
@Slf4j
public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCoreNotificationMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}")

View File

@ -19,8 +19,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
@ -45,12 +43,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeNotificationMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
@ -66,7 +64,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Service
@TbCoreComponent
public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdgeNotificationMsg> implements TbEdgeConsumerService {

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
@ -65,7 +64,6 @@ import java.util.stream.Collectors;
@Service
@TbRuleEngineComponent
@Slf4j
public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
private final TbRuleEngineConsumerContext ctx;

View File

@ -17,7 +17,8 @@ package org.thingsboard.server.service.queue.processing;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
@ -62,10 +63,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final ActorSystemContext actorContext;
protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache;

View File

@ -28,6 +28,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -35,7 +37,7 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
private final Lock startupLock = new ReentrantLock();
private volatile boolean started = false;
private PartitionChangeEvent lastPartitionChangeEvent;
private List<PartitionChangeEvent> pendingEvents = new ArrayList<>();
public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache,
@ -61,8 +63,16 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
onStartUp();
startupLock.lock();
try {
onPartitionChangeEvent(lastPartitionChangeEvent);
for (PartitionChangeEvent partitionChangeEvent : pendingEvents) {
log.info("Handling partition change event: {}", partitionChangeEvent);
try {
onPartitionChangeEvent(partitionChangeEvent);
} catch (Throwable t) {
log.error("Failed to handle partition change event: {}", partitionChangeEvent, t);
}
}
started = true;
pendingEvents = null;
} finally {
startupLock.unlock();
}
@ -70,17 +80,20 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.debug("Received partition change event: {}", event);
if (!started) {
startupLock.lock();
try {
if (!started) {
lastPartitionChangeEvent = event;
log.debug("App not started yet, storing event for later: {}", event);
pendingEvents.add(event);
return;
}
} finally {
startupLock.unlock();
}
}
log.info("Handling partition change event: {}", event);
onPartitionChangeEvent(event);
}