Fix transactional calls in TransactionalEventListener
This commit is contained in:
parent
fc865252a8
commit
b0d4faf798
@ -1401,13 +1401,15 @@ queue:
|
||||
# - key: max.poll.records
|
||||
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
|
||||
tb_housekeeper:
|
||||
# Consumer properties for Housekeeper tasks topic
|
||||
- key: max.poll.records
|
||||
# Amount of records to be returned in a single poll. For Housekeeper tasks topic, we should consume messages (tasks) one by one
|
||||
- key: max.poll.records
|
||||
value: "1"
|
||||
value: "${TB_QUEUE_KAFKA_HOUSEKEEPER_MAX_POLL_RECORDS:1}"
|
||||
tb_housekeeper.reprocessing:
|
||||
# Amount of records to be returned in a single poll. For Housekeeper reprocessing topic, we should consume messages (tasks) one by one
|
||||
# Consumer properties for Housekeeper reprocessing topic
|
||||
- key: max.poll.records
|
||||
value: "1"
|
||||
# Amount of records to be returned in a single poll. For Housekeeper reprocessing topic, we should consume messages (tasks) one by one
|
||||
value: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_MAX_POLL_RECORDS:1}"
|
||||
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
|
||||
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
|
||||
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
|
||||
|
||||
@ -19,10 +19,12 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
@ -33,26 +35,42 @@ public class DefaultHousekeeperClient implements HousekeeperClient {
|
||||
|
||||
private final TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> producer;
|
||||
private final TopicPartitionInfo submitTpi;
|
||||
private final TbQueueCallback submitCallback;
|
||||
|
||||
public DefaultHousekeeperClient(TbQueueProducerProvider producerProvider) {
|
||||
this.producer = producerProvider.getHousekeeperMsgProducer();
|
||||
this.submitTpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build();
|
||||
this.submitCallback = new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
log.trace("Submitted Housekeeper task");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Failed to submit Housekeeper task", t);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void submitTask(HousekeeperTask task) {
|
||||
log.trace("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType());
|
||||
log.debug("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType());
|
||||
/*
|
||||
* using msg key as entity id so that msgs related to certain entity are pushed to same partition,
|
||||
* e.g. on tenant deletion (entity id is tenant id), we need to clean up tenant entities in certain order
|
||||
* */
|
||||
try {
|
||||
producer.send(submitTpi, new TbProtoQueueMsg<>(task.getEntityId().getId(), ToHousekeeperServiceMsg.newBuilder()
|
||||
.setTask(TransportProtos.HousekeeperTaskProto.newBuilder()
|
||||
.setValue(JacksonUtil.toString(task))
|
||||
.setTs(task.getTs())
|
||||
.setAttempt(0)
|
||||
.build())
|
||||
.build()), null);
|
||||
.build()), submitCallback);
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to submit Housekeeper task {}", task, t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -21,9 +21,9 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
@ -39,8 +39,8 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
|
||||
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
|
||||
@ -309,8 +309,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.clientId("tb-core-housekeeper-producer-" + serviceInfoProvider.getServiceId())
|
||||
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic()))
|
||||
@ -319,21 +319,21 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgConsumer() {
|
||||
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgConsumer() {
|
||||
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.topic(topicService.buildTopicName(coreSettings.getHousekeeperTopic()))
|
||||
.clientId("tb-core-housekeeper-consumer-" + serviceInfoProvider.getServiceId())
|
||||
.groupId(topicService.buildTopicName("tb-core-housekeeper-consumer"))
|
||||
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
|
||||
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
|
||||
.admin(housekeeperAdmin)
|
||||
.statsService(consumerStatsService)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.clientId("tb-core-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId())
|
||||
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic()))
|
||||
@ -342,13 +342,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgConsumer() {
|
||||
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgConsumer() {
|
||||
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.topic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic()))
|
||||
.clientId("tb-core-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId())
|
||||
.groupId(topicService.buildTopicName("tb-core-housekeeper-reprocessing-consumer"))
|
||||
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
|
||||
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
|
||||
.admin(housekeeperReprocessingAdmin)
|
||||
.statsService(consumerStatsService)
|
||||
.build();
|
||||
|
||||
@ -25,6 +25,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -233,8 +234,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.clientId("tb-rule-engine-housekeeper-producer-" + serviceInfoProvider.getServiceId())
|
||||
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic()))
|
||||
|
||||
@ -21,6 +21,7 @@ import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||
@ -176,8 +177,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.clientId("tb-transport-housekeeper-producer-" + serviceInfoProvider.getServiceId())
|
||||
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic()))
|
||||
|
||||
@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
@ -109,8 +110,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>>builder()
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToHousekeeperServiceMsg>>builder()
|
||||
.settings(kafkaSettings)
|
||||
.clientId("tb-vc-housekeeper-producer-" + serviceInfoProvider.getServiceId())
|
||||
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperTopic()))
|
||||
|
||||
@ -25,6 +25,8 @@ import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -157,12 +159,12 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getHousekeeperTopic()));
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||
@ -132,7 +133,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getHousekeeperTopic()));
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,8 @@ package org.thingsboard.server.dao.housekeeper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
@ -36,15 +38,21 @@ public class CleanUpService {
|
||||
private final HousekeeperClient housekeeperClient;
|
||||
private final RelationService relationService;
|
||||
|
||||
@TransactionalEventListener(fallbackExecution = true)
|
||||
@TransactionalEventListener(fallbackExecution = true) // after transaction commit
|
||||
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||
public void handleEntityDeletionEvent(DeleteEntityEvent<?> event) {
|
||||
TenantId tenantId = event.getTenantId();
|
||||
EntityId entityId = event.getEntityId();
|
||||
log.trace("[{}][{}][{}] Handling entity deletion event", tenantId, entityId.getEntityType(), entityId.getId());
|
||||
EntityType entityType = entityId.getEntityType();
|
||||
try {
|
||||
log.trace("[{}][{}][{}] Handling entity deletion event", tenantId, entityType, entityId.getId());
|
||||
cleanUpRelatedData(tenantId, entityId);
|
||||
if (entityId.getEntityType() == EntityType.USER) {
|
||||
if (entityType == EntityType.USER) {
|
||||
housekeeperClient.submitTask(HousekeeperTask.unassignAlarms((User) event.getEntity()));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("[{}][{}][{}] Failed to handle entity deletion event", tenantId, entityType, entityId.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public void cleanUpRelatedData(TenantId tenantId, EntityId entityId) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user