diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 3636eb05af..1f421974be 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -72,7 +72,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager + diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index 9bb0bbe30a..1add0dae37 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -28,7 +28,6 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.edqs.util.EdqsRocksDb; import java.util.concurrent.TimeUnit; -import java.util.function.BiPredicate; import static org.awaitility.Awaitility.await; @@ -45,7 +44,7 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { @Autowired private EdqsService edqsService; - @MockBean + @MockBean // so that we don't do backup for tests private EdqsRocksDb edqsRocksDb; @Before diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/ObjectType.java b/common/data/src/main/java/org/thingsboard/server/common/data/ObjectType.java index bc5ec58213..86252fb7f8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/ObjectType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/ObjectType.java @@ -15,9 +15,8 @@ */ package org.thingsboard.server.common.data; -import java.util.Arrays; import java.util.EnumSet; -import java.util.HashSet; +import java.util.List; import java.util.Set; public enum ObjectType { @@ -68,12 +67,13 @@ public enum ObjectType { TENANT, TENANT_PROFILE, CUSTOMER, DEVICE_PROFILE, DEVICE, ASSET_PROFILE, ASSET, EDGE, ENTITY_VIEW, USER, DASHBOARD, RULE_CHAIN, WIDGET_TYPE, WIDGETS_BUNDLE, API_USAGE_STATE, QUEUE_STATS ); - public static final Set edqsTypes = new HashSet<>(edqsTenantTypes); + public static final Set edqsTypes = EnumSet.copyOf(edqsTenantTypes); public static final Set edqsSystemTypes = EnumSet.of(TENANT, TENANT_PROFILE, USER, DASHBOARD, API_USAGE_STATE, ATTRIBUTE_KV, LATEST_TS_KV); + public static final Set unversionedTypes = EnumSet.of(QUEUE_STATS); static { - edqsTypes.addAll(Arrays.asList(RELATION, ATTRIBUTE_KV, LATEST_TS_KV)); + edqsTypes.addAll(List.of(RELATION, ATTRIBUTE_KV, LATEST_TS_KV)); } public EntityType toEntityType() { diff --git a/common/edqs/pom.xml b/common/edqs/pom.xml index e58c7c97a0..2abd1286b8 100644 --- a/common/edqs/pom.xml +++ b/common/edqs/pom.xml @@ -68,6 +68,10 @@ org.thingsboard.common queue + + org.springframework.boot + spring-boot-starter-web + org.apache.kafka kafka-clients diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 6fe09f77d1..a90a029bc4 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -21,10 +21,12 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; @@ -72,6 +74,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; @EdqsComponent @@ -85,8 +88,8 @@ public class EdqsProcessor implements TbQueueHandler, private final EdqRepository repository; private final EdqsConfig config; private final EdqsPartitionService partitionService; - @Autowired - @Lazy + private final ConfigurableApplicationContext applicationContext; + @Autowired @Lazy private EdqsStateService stateService; private MainQueueConsumerManager, QueueConfig> eventsConsumer; @@ -96,17 +99,30 @@ public class EdqsProcessor implements TbQueueHandler, private ExecutorService mgmtExecutor; private ScheduledExecutorService scheduler; private ListeningExecutorService requestExecutor; + private ExecutorService repartitionExecutor; private final VersionsStore versionsStore = new VersionsStore(); private final AtomicInteger counter = new AtomicInteger(); // FIXME: TMP + @Getter + private Consumer errorHandler; + @PostConstruct private void init() { consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-mgmt"); scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler"); requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(12, "edqs-requests")); + repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-repartition")); + errorHandler = error -> { + if (error instanceof OutOfMemoryError) { + log.error("OOM detected, shutting down"); + repository.clear(); + Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-shutdown")) + .execute(applicationContext::close); + } + }; eventsConsumer = MainQueueConsumerManager., QueueConfig>builder() .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic())) @@ -117,7 +133,7 @@ public class EdqsProcessor implements TbQueueHandler, ToEdqsMsg msg = queueMsg.getValue(); log.trace("Processing message: {}", msg); process(msg, EdqsQueue.EVENTS); - } catch (Throwable t) { + } catch (Exception t) { log.error("Failed to process message: {}", queueMsg, t); } } @@ -127,6 +143,7 @@ public class EdqsProcessor implements TbQueueHandler, .consumerExecutor(consumersExecutor) .taskExecutor(mgmtExecutor) .scheduler(scheduler) + .uncaughtErrorHandler(errorHandler) .build(); responseTemplate = queueFactory.createEdqsResponseTemplate(); } @@ -141,7 +158,7 @@ public class EdqsProcessor implements TbQueueHandler, if (event.getServiceType() != ServiceType.EDQS) { return; } - consumersExecutor.submit(() -> { + repartitionExecutor.submit(() -> { // todo: maybe cancel the task if new event comes try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); Set partitions = newPartitions.stream() @@ -220,8 +237,8 @@ public class EdqsProcessor implements TbQueueHandler, if (!versionsStore.isNew(key, version)) { return; } - } else { - log.warn("[{}] {} doesn't have version: {}", tenantId, objectType, edqsMsg); + } else if (!ObjectType.unversionedTypes.contains(objectType)) { + log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key); } if (queue != EdqsQueue.STATE) { stateService.save(tenantId, objectType, key, eventType, edqsMsg); @@ -272,6 +289,7 @@ public class EdqsProcessor implements TbQueueHandler, mgmtExecutor.shutdownNow(); scheduler.shutdownNow(); requestExecutor.shutdownNow(); + repartitionExecutor.shutdownNow(); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index b21184110f..9b1e925367 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -53,7 +53,7 @@ public class EdqsProducer { TbQueueCallback callback = new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg); // fixme log levels + log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg); // fixme log levels } @Override diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java index 68e1c42589..822061e918 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java @@ -153,7 +153,7 @@ public class TenantRepo { EntityData to = getOrCreate(entity.getTo()); boolean added = repo.add(from, to, TbStringPool.intern(entity.getType())); if (added) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.RELATION, EdqsEventType.UPDATED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.UPDATED)); } } else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) { if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) { @@ -172,7 +172,7 @@ public class TenantRepo { if (relationsRepo != null) { boolean removed = relationsRepo.remove(entityRelation.getFrom().getId(), entityRelation.getTo().getId(), entityRelation.getType()); if (removed) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.RELATION, EdqsEventType.DELETED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.DELETED)); } } } else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) { @@ -223,7 +223,7 @@ public class TenantRepo { EntityData removed = getEntityMap(entityType).remove(entityId); if (removed != null) { getEntitySet(entityType).remove(removed); - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED)); UUID customerId = removed.getCustomerId(); if (customerId != null) { CustomerData customerData = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(customerId); @@ -244,7 +244,7 @@ public class TenantRepo { Integer keyId = KeyDictionary.get(attributeKv.getKey()); boolean added = entityData.putAttr(keyId, attributeKv.getScope(), toDataPoint(attributeKv.getLastUpdateTs(), value)); if (added) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED)); } } } @@ -254,7 +254,7 @@ public class TenantRepo { if (entityData != null) { boolean removed = entityData.removeAttr(KeyDictionary.get(attributeKv.getKey()), attributeKv.getScope()); if (removed) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.DELETED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.DELETED)); } } } @@ -266,7 +266,7 @@ public class TenantRepo { Integer keyId = KeyDictionary.get(latestTsKv.getKey()); boolean added = entityData.putTs(keyId, toDataPoint(latestTsKv.getTs(), value)); if (added) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED)); } } } @@ -276,7 +276,7 @@ public class TenantRepo { if (entityData != null) { boolean removed = entityData.removeTs(KeyDictionary.get(latestTsKv.getKey())); if (removed) { - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.DELETED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.DELETED)); } } } @@ -331,7 +331,7 @@ public class TenantRepo { log.debug("[{}] Adding {} {}", tenantId, entityType, id); EntityData entityData = constructEntityData(entityType, entityId); getEntitySet(entityType).add(entityData); - edqsStatsService.ifPresent(statService -> statService.reportTenantEdqsObject(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.UPDATED)); + edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.UPDATED)); return entityData; }); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java new file mode 100644 index 0000000000..721675245a --- /dev/null +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edqs.state; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/api/edqs") +public class EdqsController { + + private final EdqsStateService edqsStateService; + + @GetMapping("/ready") + public ResponseEntity isReady() { + if (edqsStateService.isReady()) { + return ResponseEntity.ok().build(); + } else { + return ResponseEntity.badRequest().build(); + } + } + +} diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java index 8866a1c771..06d080ac64 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java @@ -29,4 +29,6 @@ public interface EdqsStateService { void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg); + boolean isReady(); + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 84f3cfd22f..cfa831f720 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -42,10 +42,10 @@ import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.edqs.EdqsQueueFactory; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; -import org.thingsboard.server.queue.util.AfterStartUp; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -66,6 +66,8 @@ public class KafkaEdqsStateService implements EdqsStateService { private QueueConsumerManager> eventsConsumer; private EdqsProducer stateProducer; + private boolean initialRestoreDone; + private ExecutorService consumersExecutor; private ExecutorService mgmtExecutor; private ScheduledExecutorService scheduler; @@ -76,7 +78,7 @@ public class KafkaEdqsStateService implements EdqsStateService { @PostConstruct private void init() { - consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-backup-consumer")); + consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt"); scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler"); @@ -92,8 +94,8 @@ public class KafkaEdqsStateService implements EdqsStateService { if (stateReadCount.incrementAndGet() % 100000 == 0) { log.info("[state] Processed {} msgs", stateReadCount.get()); } - } catch (Throwable t) { - log.error("Failed to process message: {}", queueMsg, t); + } catch (Exception e) { + log.error("Failed to process message: {}", queueMsg, e); // TODO: do something about the error - e.g. reprocess } } consumer.commit(); @@ -102,39 +104,48 @@ public class KafkaEdqsStateService implements EdqsStateService { .consumerExecutor(consumersExecutor) .taskExecutor(mgmtExecutor) .scheduler(scheduler) + .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); - eventsConsumer = QueueConsumerManager.>builder() // FIXME Slavik writes to the state while we read it, slows down the start + ExecutorService backupExecutor = ThingsBoardExecutors.newLimitedTasksExecutor(12, 1000, "events-to-backup-executor"); + eventsConsumer = QueueConsumerManager.>builder() // FIXME Slavik writes to the state while we read it, slows down the start. maybe start backup consumer after restore is finished .name("edqs-events-to-backup-consumer") .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer) -> { + CountDownLatch resultLatch = new CountDownLatch(msgs.size()); for (TbProtoQueueMsg queueMsg : msgs) { - try { - ToEdqsMsg msg = queueMsg.getValue(); - log.trace("Processing message: {}", msg); + backupExecutor.submit(() -> { + try { + ToEdqsMsg msg = queueMsg.getValue(); + log.trace("Processing message: {}", msg); - if (msg.hasEventMsg()) { - EdqsEventMsg eventMsg = msg.getEventMsg(); - String key = eventMsg.getKey(); - if (eventsReadCount.incrementAndGet() % 100000 == 0) { - log.info("[events-to-backup] Processed {} msgs", eventsReadCount.get()); - } - if (eventMsg.hasVersion()) { - if (!versionsStore.isNew(key, eventMsg.getVersion())) { - return; + if (msg.hasEventMsg()) { + EdqsEventMsg eventMsg = msg.getEventMsg(); + String key = eventMsg.getKey(); + int count = eventsReadCount.incrementAndGet(); + if (count % 100000 == 0) { + log.info("[events-to-backup] Processed {} msgs", count); + } + if (eventMsg.hasVersion()) { + if (!versionsStore.isNew(key, eventMsg.getVersion())) { + return; + } } - } - TenantId tenantId = getTenantId(msg); - ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); - EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); - log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); - stateProducer.send(tenantId, objectType, key, msg); + TenantId tenantId = getTenantId(msg); + ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); + EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); + log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); + stateProducer.send(tenantId, objectType, key, msg); + } + } catch (Throwable t) { + log.error("Failed to process message: {}", queueMsg, t); + } finally { + resultLatch.countDown(); } - } catch (Throwable t) { - log.error("Failed to process message: {}", queueMsg, t); - } + }); } + resultLatch.await(); consumer.commit(); }) .consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group @@ -149,22 +160,21 @@ public class KafkaEdqsStateService implements EdqsStateService { .build(); } - @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) - public void afterStartUp() { - eventsConsumer.subscribe(); - eventsConsumer.launch(); - } - @Override public void restore(Set partitions) { stateReadCount.set(0); //TODO Slavik: do not support remote mode in monolith setup long startTs = System.currentTimeMillis(); log.info("Restore started for partitions {}", partitions.stream().map(tpi -> tpi.getPartition().orElse(-1)).sorted().toList()); - stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update stateConsumer.awaitStop(0); // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait - log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), stateReadCount.get()); + + if (!initialRestoreDone) { + initialRestoreDone = true; + + eventsConsumer.subscribe(); + eventsConsumer.launch(); + } } @Override @@ -172,6 +182,11 @@ public class KafkaEdqsStateService implements EdqsStateService { // do nothing here, backup is done by events consumer } + @Override + public boolean isReady() { + return initialRestoreDone; + } + private TenantId getTenantId(ToEdqsMsg edqsMsg) { return TenantId.fromUUID(new UUID(edqsMsg.getTenantIdMSB(), edqsMsg.getTenantIdLSB())); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index 8d2d563d90..6a61488182 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -43,13 +43,11 @@ public class LocalEdqsStateService implements EdqsStateService { @Autowired private EdqsRocksDb db; - private Set partitions; + private boolean restoreDone; @Override public void restore(Set partitions) { - if (this.partitions == null) { - this.partitions = partitions; - } else { + if (restoreDone) { return; } @@ -62,6 +60,7 @@ public class LocalEdqsStateService implements EdqsStateService { log.error("[{}] Failed to restore value", key, e); } }); + restoreDone = true; } @Override @@ -78,4 +77,9 @@ public class LocalEdqsStateService implements EdqsStateService { } } + @Override + public boolean isReady() { + return restoreDone; + } + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java index 2ca036d23d..9619d086ad 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java @@ -52,9 +52,9 @@ public class EdqsStatsService { log.info("EDQS Stats: {}", values); } - public void reportTenantEdqsObject(TenantId tenantId, ObjectType objectType, EdqsEventType eventType) { + public void reportEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType) { statsMap.computeIfAbsent(tenantId, id -> new EdqsStats(tenantId, statsFactory)) - .reportEdqsObject(objectType, eventType); + .reportEvent(objectType, eventType); } @Getter @@ -78,7 +78,7 @@ public class EdqsStatsService { .collect(Collectors.joining(", ")); } - public void reportEdqsObject(ObjectType objectType, EdqsEventType eventType) { + public void reportEvent(ObjectType objectType, EdqsEventType eventType) { AtomicInteger objectCounter = getOrCreateObjectCounter(objectType); if (eventType == EdqsEventType.UPDATED){ objectCounter.incrementAndGet(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 2f44a9c4ae..9d45b168ee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.stream.Collectors; @Slf4j @@ -53,6 +54,7 @@ public class MainQueueConsumerManager uncaughtErrorHandler; private final java.util.Queue tasks = new ConcurrentLinkedQueue<>(); private final ReentrantLock lock = new ReentrantLock(); @@ -68,7 +70,8 @@ public class MainQueueConsumerManager> consumerCreator, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, - ExecutorService taskExecutor) { + ExecutorService taskExecutor, + Consumer uncaughtErrorHandler) { this.queueKey = queueKey; this.config = config; this.msgPackProcessor = msgPackProcessor; @@ -76,6 +79,7 @@ public class MainQueueConsumerManager consumerLoop = consumerExecutor.submit(() -> { ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString()); - try { - consumerLoop(consumerTask.getConsumer()); - } catch (Throwable e) { - log.error("Failure in consumer loop", e); - } + consumerLoop(consumerTask.getConsumer()); log.info("[{}] Consumer stopped", consumerTask.getKey()); }); consumerTask.setTask(consumerLoop); } private void consumerLoop(TbQueueConsumer consumer) { - while (!stopped && !consumer.isStopped()) { - try { - List msgs = consumer.poll(config.getPollInterval()); - if (msgs.isEmpty()) { - continue; - } - processMsgs(msgs, consumer, config); - } catch (Exception e) { - if (!consumer.isStopped()) { - log.warn("Failed to process messages from queue", e); - try { - Thread.sleep(config.getPollInterval()); - } catch (InterruptedException e2) { - log.trace("Failed to wait until the server has capacity to handle new requests", e2); + try { + while (!stopped && !consumer.isStopped()) { + try { + List msgs = consumer.poll(config.getPollInterval()); + if (msgs.isEmpty()) { + continue; + } + processMsgs(msgs, consumer, config); + } catch (Exception e) { + if (!consumer.isStopped()) { + log.warn("Failed to process messages from queue", e); + try { + Thread.sleep(config.getPollInterval()); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); + } } } } - } - if (consumer.isStopped()) { + if (consumer.isStopped()) { + consumer.unsubscribe(); + } + } catch (Throwable t) { + log.error("Failure in consumer loop", t); + if (uncaughtErrorHandler != null) { + uncaughtErrorHandler.accept(t); + } consumer.unsubscribe(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java index 708e24fdac..4ed0ffa497 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java @@ -84,7 +84,7 @@ public class TbQueueConsumerTask { } log.trace("[{}] Awaited finish", key); } catch (Exception e) { - log.warn("[{}] Failed to await for consumer to stop", key, e); + log.warn("[{}] Failed to await for consumer to stop (timeout {} sec)", key, timeoutSec, e); } task = null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java index 0367a2be78..ade9204572 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -26,15 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.queue.discovery.PartitionService; import java.time.Duration; import java.util.ArrayList; @@ -56,10 +51,6 @@ public class TbKafkaConsumerStatsService { private final TbKafkaSettings kafkaSettings; private final TbKafkaConsumerStatisticConfig statsConfig; - @Lazy - @Autowired - private PartitionService partitionService; - private Consumer consumer; private ScheduledExecutorService statsPrintScheduler; @@ -111,9 +102,7 @@ public class TbKafkaConsumerStatsService { } private boolean isStatsPrintRequired() { - boolean isMyRuleEnginePartition = partitionService.isMyPartition(ServiceType.TB_RULE_ENGINE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID); - boolean isMyCorePartition = partitionService.isMyPartition(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID); - return log.isInfoEnabled() && (isMyRuleEnginePartition || isMyCorePartition); + return log.isInfoEnabled(); } private List getTopicsStatsWithLag(Map groupOffsets, Map endOffsets) {