diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index c9923e6194..00bed5e91d 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; import java.util.List; @@ -43,12 +44,14 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class LocalEdqsStateService implements EdqsStateService { private final EdqsRocksDb db; + private final DiscoveryService discoveryService; @Autowired @Lazy private EdqsProcessor processor; private PartitionedQueueConsumerManager> eventConsumer; private List> otherConsumers; - private Set partitions; + + private boolean ready = false; @Override public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { @@ -58,7 +61,7 @@ public class LocalEdqsStateService implements EdqsStateService { @Override public void process(Set partitions) { - if (this.partitions == null) { + if (!ready) { db.forEach((key, value) -> { try { ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); @@ -70,11 +73,13 @@ public class LocalEdqsStateService implements EdqsStateService { }); log.info("Restore completed"); } + ready = true; + discoveryService.setReady(true); + eventConsumer.update(withTopic(partitions, eventConsumer.getTopic())); for (PartitionedQueueConsumerManager consumer : otherConsumers) { consumer.update(withTopic(partitions, consumer.getTopic())); } - this.partitions = partitions; } @Override @@ -93,7 +98,7 @@ public class LocalEdqsStateService implements EdqsStateService { @Override public boolean isReady() { - return partitions != null; + return ready; } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 2d9ac3630e..d73f531b5b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -179,6 +179,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Override public void setReady(boolean ready) { + log.debug("Marking current service as {}", ready ? "ready" : "NOT ready"); boolean changed = serviceInfoProvider.setReady(ready); if (changed) { try {