diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 5baa155ae9..651d2154de 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1401,13 +1401,15 @@ queue: # - key: max.poll.records # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" tb_housekeeper: - # Amount of records to be returned in a single poll. For Housekeeper tasks topic, we should consume messages (tasks) one by one + # Consumer properties for Housekeeper tasks topic - key: max.poll.records - value: "1" + # Amount of records to be returned in a single poll. For Housekeeper tasks topic, we should consume messages (tasks) one by one + value: "${TB_QUEUE_KAFKA_HOUSEKEEPER_MAX_POLL_RECORDS:1}" tb_housekeeper.reprocessing: - # Amount of records to be returned in a single poll. For Housekeeper reprocessing topic, we should consume messages (tasks) one by one + # Consumer properties for Housekeeper reprocessing topic - key: max.poll.records - value: "1" + # Amount of records to be returned in a single poll. For Housekeeper reprocessing topic, we should consume messages (tasks) one by one + value: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_MAX_POLL_RECORDS:1}" other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/housekeeper/DefaultHousekeeperClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/housekeeper/DefaultHousekeeperClient.java index b766f3826b..e9737bf90c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/housekeeper/DefaultHousekeeperClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/housekeeper/DefaultHousekeeperClient.java @@ -19,10 +19,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; @@ -33,26 +35,42 @@ public class DefaultHousekeeperClient implements HousekeeperClient { private final TbQueueProducer> producer; private final TopicPartitionInfo submitTpi; + private final TbQueueCallback submitCallback; public DefaultHousekeeperClient(TbQueueProducerProvider producerProvider) { this.producer = producerProvider.getHousekeeperMsgProducer(); this.submitTpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(); + this.submitCallback = new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + log.trace("Submitted Housekeeper task"); + } + + @Override + public void onFailure(Throwable t) { + log.error("Failed to submit Housekeeper task", t); + } + }; } @Override public void submitTask(HousekeeperTask task) { - log.trace("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType()); + log.debug("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType()); /* * using msg key as entity id so that msgs related to certain entity are pushed to same partition, * e.g. on tenant deletion (entity id is tenant id), we need to clean up tenant entities in certain order * */ - producer.send(submitTpi, new TbProtoQueueMsg<>(task.getEntityId().getId(), ToHousekeeperServiceMsg.newBuilder() - .setTask(TransportProtos.HousekeeperTaskProto.newBuilder() - .setValue(JacksonUtil.toString(task)) - .setTs(task.getTs()) - .setAttempt(0) - .build()) - .build()), null); + try { + producer.send(submitTpi, new TbProtoQueueMsg<>(task.getEntityId().getId(), ToHousekeeperServiceMsg.newBuilder() + .setTask(TransportProtos.HousekeeperTaskProto.newBuilder() + .setValue(JacksonUtil.toString(task)) + .setTs(task.getTs()) + .setAttempt(0) + .build()) + .build()), submitCallback); + } catch (Throwable t) { + log.error("Failed to submit Housekeeper task {}", task, t); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 478963d300..03c7cf1173 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -21,9 +21,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; @@ -39,8 +39,8 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -309,8 +309,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { - return TbKafkaProducerTemplate.>builder() + public TbQueueProducer> createHousekeeperMsgProducer() { + return TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("tb-core-housekeeper-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic())) @@ -319,21 +319,21 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createHousekeeperMsgConsumer() { - return TbKafkaConsumerTemplate.>builder() + public TbQueueConsumer> createHousekeeperMsgConsumer() { + return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) .topic(topicService.buildTopicName(coreSettings.getHousekeeperTopic())) .clientId("tb-core-housekeeper-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("tb-core-housekeeper-consumer")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) .admin(housekeeperAdmin) .statsService(consumerStatsService) .build(); } @Override - public TbQueueProducer> createHousekeeperReprocessingMsgProducer() { - return TbKafkaProducerTemplate.>builder() + public TbQueueProducer> createHousekeeperReprocessingMsgProducer() { + return TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("tb-core-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic())) @@ -342,13 +342,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createHousekeeperReprocessingMsgConsumer() { - return TbKafkaConsumerTemplate.>builder() + public TbQueueConsumer> createHousekeeperReprocessingMsgConsumer() { + return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) .topic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic())) .clientId("tb-core-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("tb-core-housekeeper-reprocessing-consumer")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) .admin(housekeeperReprocessingAdmin) .statsService(consumerStatsService) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 869f364970..f6fd5d4619 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -25,6 +25,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -233,8 +234,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { - return TbKafkaProducerTemplate.>builder() + public TbQueueProducer> createHousekeeperMsgProducer() { + return TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("tb-rule-engine-housekeeper-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic())) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index 4140925775..8464c98770 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; @@ -176,8 +177,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { - return TbKafkaProducerTemplate.>builder() + public TbQueueProducer> createHousekeeperMsgProducer() { + return TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("tb-transport-housekeeper-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic())) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java index 95086e04b4..4d3ddc4925 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java @@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -109,8 +110,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { - return TbKafkaProducerTemplate.>builder() + public TbQueueProducer> createHousekeeperMsgProducer() { + return TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("tb-vc-housekeeper-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic())) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index d0192d3094..2b118251ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -25,6 +25,8 @@ import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -157,12 +159,12 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override - public TbQueueProducer> createToOtaPackageStateServiceMsgProducer() { + public TbQueueProducer> createToOtaPackageStateServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic())); } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { + public TbQueueProducer> createHousekeeperMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getHousekeeperTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java index d91dd91873..062bd65cad 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; @@ -132,7 +133,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createHousekeeperMsgProducer() { + public TbQueueProducer> createHousekeeperMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getHousekeeperTopic())); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java index 56946c4705..a2bc68b61b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.housekeeper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; @@ -36,14 +38,20 @@ public class CleanUpService { private final HousekeeperClient housekeeperClient; private final RelationService relationService; - @TransactionalEventListener(fallbackExecution = true) + @TransactionalEventListener(fallbackExecution = true) // after transaction commit + @Transactional(propagation = Propagation.NOT_SUPPORTED) public void handleEntityDeletionEvent(DeleteEntityEvent event) { TenantId tenantId = event.getTenantId(); EntityId entityId = event.getEntityId(); - log.trace("[{}][{}][{}] Handling entity deletion event", tenantId, entityId.getEntityType(), entityId.getId()); - cleanUpRelatedData(tenantId, entityId); - if (entityId.getEntityType() == EntityType.USER) { - housekeeperClient.submitTask(HousekeeperTask.unassignAlarms((User) event.getEntity())); + EntityType entityType = entityId.getEntityType(); + try { + log.trace("[{}][{}][{}] Handling entity deletion event", tenantId, entityType, entityId.getId()); + cleanUpRelatedData(tenantId, entityId); + if (entityType == EntityType.USER) { + housekeeperClient.submitTask(HousekeeperTask.unassignAlarms((User) event.getEntity())); + } + } catch (Throwable e) { + log.error("[{}][{}][{}] Failed to handle entity deletion event", tenantId, entityType, entityId.getId(), e); } }