Fix partition change event ignored

This commit is contained in:
ViacheslavKlimov 2024-02-09 14:15:24 +02:00
parent 25f1c82e8e
commit c1f16769f4
6 changed files with 44 additions and 28 deletions

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.actors.app.AppActor;
import org.thingsboard.server.actors.app.AppInitMsg; import org.thingsboard.server.actors.app.AppInitMsg;
import org.thingsboard.server.actors.stats.StatsActor; import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
@ -124,6 +125,11 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType())); this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
} }
@Override
protected boolean filterTbApplicationEvent(PartitionChangeEvent event) {
return event.getServiceType() == ServiceType.TB_RULE_ENGINE || event.getServiceType() == ServiceType.TB_CORE;
}
@PreDestroy @PreDestroy
public void stopActorSystem() { public void stopActorSystem() {
if (system != null) { if (system != null) {

View File

@ -89,11 +89,14 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
*/ */
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
if (getServiceType().equals(partitionChangeEvent.getServiceType())) { log.debug("onTbApplicationEvent, processing event: {}", partitionChangeEvent);
log.debug("onTbApplicationEvent, processing event: {}", partitionChangeEvent); subscribeQueue.add(partitionChangeEvent.getPartitions());
subscribeQueue.add(partitionChangeEvent.getPartitions()); scheduledExecutor.submit(this::pollInitStateFromDB);
scheduledExecutor.submit(this::pollInitStateFromDB); }
}
@Override
protected boolean filterTbApplicationEvent(PartitionChangeEvent event) {
return getServiceType().equals(event.getServiceType());
} }
protected void pollInitStateFromDB() { protected void pollInitStateFromDB() {

View File

@ -220,16 +220,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (event.getServiceType().equals(getServiceType())) { log.info("Subscribing to partitions: {}", event.getPartitions());
log.info("Subscribing to partitions: {}", event.getPartitions()); this.mainConsumer.subscribe(event.getPartitions());
this.mainConsumer.subscribe(event.getPartitions()); this.usageStatsConsumer.subscribe(
this.usageStatsConsumer.subscribe( event
event .getPartitions()
.getPartitions() .stream()
.stream() .map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic())) .collect(Collectors.toSet()));
.collect(Collectors.toSet()));
}
this.firmwareStatesConsumer.subscribe(); this.firmwareStatesConsumer.subscribe();
} }

View File

@ -110,16 +110,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (event.getServiceType().equals(getServiceType())) { event.getPartitionsMap().forEach((queueKey, partitions) -> {
event.getPartitionsMap().forEach((queueKey, partitions) -> { var consumer = consumers.get(queueKey);
var consumer = consumers.get(queueKey); if (consumer != null) {
if (consumer != null) { consumer.update(partitions);
consumer.update(partitions); } else {
} else { log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey);
log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey); }
} });
});
}
} }
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)

View File

@ -108,6 +108,11 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
launchMainConsumers(); launchMainConsumers();
} }
@Override
protected boolean filterTbApplicationEvent(PartitionChangeEvent event) {
return event.getServiceType() == getServiceType();
}
protected abstract ServiceType getServiceType(); protected abstract ServiceType getServiceType();
protected abstract void launchMainConsumers(); protected abstract void launchMainConsumers();

View File

@ -15,21 +15,27 @@
*/ */
package org.thingsboard.server.queue.discovery; package org.thingsboard.server.queue.discovery;
import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.thingsboard.server.queue.discovery.event.TbApplicationEvent; import org.thingsboard.server.queue.discovery.event.TbApplicationEvent;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public abstract class TbApplicationEventListener<T extends TbApplicationEvent> implements ApplicationListener<T> { public abstract class TbApplicationEventListener<T extends TbApplicationEvent> implements ApplicationListener<T> {
private int lastProcessedSequenceNumber = Integer.MIN_VALUE; private int lastProcessedSequenceNumber = Integer.MIN_VALUE;
private final Lock seqNumberLock = new ReentrantLock(); private final Lock seqNumberLock = new ReentrantLock();
private final Logger log = LoggerFactory.getLogger(getClass());
@Override @Override
public void onApplicationEvent(T event) { public void onApplicationEvent(T event) {
if (!filterTbApplicationEvent(event)) {
log.trace("Skipping event due to filter: {}", event);
return;
}
boolean validUpdate = false; boolean validUpdate = false;
seqNumberLock.lock(); seqNumberLock.lock();
try { try {
@ -40,7 +46,7 @@ public abstract class TbApplicationEventListener<T extends TbApplicationEvent> i
} finally { } finally {
seqNumberLock.unlock(); seqNumberLock.unlock();
} }
if (validUpdate && filterTbApplicationEvent(event)) { if (validUpdate) {
try { try {
onTbApplicationEvent(event); onTbApplicationEvent(event);
} catch (Exception e) { } catch (Exception e) {