Fix readiness status for local EDQS
This commit is contained in:
parent
b97c1888f1
commit
14f24d2497
@ -29,6 +29,7 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||||
|
import org.thingsboard.server.queue.discovery.DiscoveryService;
|
||||||
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
|
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -43,12 +44,14 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
|
|||||||
public class LocalEdqsStateService implements EdqsStateService {
|
public class LocalEdqsStateService implements EdqsStateService {
|
||||||
|
|
||||||
private final EdqsRocksDb db;
|
private final EdqsRocksDb db;
|
||||||
|
private final DiscoveryService discoveryService;
|
||||||
@Autowired @Lazy
|
@Autowired @Lazy
|
||||||
private EdqsProcessor processor;
|
private EdqsProcessor processor;
|
||||||
|
|
||||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
|
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
|
||||||
private List<PartitionedQueueConsumerManager<?>> otherConsumers;
|
private List<PartitionedQueueConsumerManager<?>> otherConsumers;
|
||||||
private Set<TopicPartitionInfo> partitions;
|
|
||||||
|
private boolean ready = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
|
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
|
||||||
@ -58,7 +61,7 @@ public class LocalEdqsStateService implements EdqsStateService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(Set<TopicPartitionInfo> partitions) {
|
public void process(Set<TopicPartitionInfo> partitions) {
|
||||||
if (this.partitions == null) {
|
if (!ready) {
|
||||||
db.forEach((key, value) -> {
|
db.forEach((key, value) -> {
|
||||||
try {
|
try {
|
||||||
ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
|
ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
|
||||||
@ -70,11 +73,13 @@ public class LocalEdqsStateService implements EdqsStateService {
|
|||||||
});
|
});
|
||||||
log.info("Restore completed");
|
log.info("Restore completed");
|
||||||
}
|
}
|
||||||
|
ready = true;
|
||||||
|
discoveryService.setReady(true);
|
||||||
|
|
||||||
eventConsumer.update(withTopic(partitions, eventConsumer.getTopic()));
|
eventConsumer.update(withTopic(partitions, eventConsumer.getTopic()));
|
||||||
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
|
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
|
||||||
consumer.update(withTopic(partitions, consumer.getTopic()));
|
consumer.update(withTopic(partitions, consumer.getTopic()));
|
||||||
}
|
}
|
||||||
this.partitions = partitions;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -93,7 +98,7 @@ public class LocalEdqsStateService implements EdqsStateService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isReady() {
|
public boolean isReady() {
|
||||||
return partitions != null;
|
return ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -179,6 +179,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReady(boolean ready) {
|
public void setReady(boolean ready) {
|
||||||
|
log.debug("Marking current service as {}", ready ? "ready" : "NOT ready");
|
||||||
boolean changed = serviceInfoProvider.setReady(ready);
|
boolean changed = serviceInfoProvider.setReady(ready);
|
||||||
if (changed) {
|
if (changed) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user