diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/DefaultHousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/DefaultHousekeeperService.java index 644e3cc2c7..76bf40fc36 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/DefaultHousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/DefaultHousekeeperService.java @@ -15,11 +15,11 @@ */ package org.thingsboard.server.service.housekeeper; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.housekeeper.HousekeeperService; @@ -40,7 +40,6 @@ import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProce import javax.annotation.PreDestroy; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -95,8 +94,11 @@ public class DefaultHousekeeperService implements HousekeeperService { } for (TbProtoQueueMsg msg : msgs) { + log.trace("Processing task: {}", msg); try { processTask(msg); + } catch (InterruptedException e) { + return; } catch (Throwable e) { log.error("Unexpected error during message processing [{}]", msg, e); reprocessingService.submitForReprocessing(msg, e); @@ -121,16 +123,20 @@ public class DefaultHousekeeperService implements HousekeeperService { @SuppressWarnings("unchecked") protected void processTask(TbProtoQueueMsg queueMsg) throws Exception { ToHousekeeperServiceMsg msg = queueMsg.getValue(); - HousekeeperTask task = dataDecodingEncodingService.decode(msg.getTask().getValue().toByteArray()).get(); - HousekeeperTaskProcessor taskProcessor = getTaskProcessor(task.getTaskType()); + HousekeeperTask task = JacksonUtil.fromString(msg.getTask().getValue(), HousekeeperTask.class); + HousekeeperTaskProcessor taskProcessor = (HousekeeperTaskProcessor) taskProcessors.get(task.getTaskType()); + if (taskProcessor == null) { + throw new IllegalArgumentException("Unsupported task type " + task.getTaskType()); + } - log.info("[{}][{}][{}] Processing task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task); try { Future future = executor.submit(() -> { taskProcessor.process((T) task); return null; }); future.get(taskProcessingTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw e; } catch (Throwable e) { Throwable error = e; if (e instanceof ExecutionException) { @@ -152,21 +158,15 @@ public class DefaultHousekeeperService implements HousekeeperService { @Override public void submitTask(UUID key, HousekeeperTask task) { + log.trace("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType()); TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(); producer.send(tpi, new TbProtoQueueMsg<>(key, ToHousekeeperServiceMsg.newBuilder() .setTask(HousekeeperTaskProto.newBuilder() - .setValue(ByteString.copyFrom(dataDecodingEncodingService.encode(task))) + .setValue(JacksonUtil.toString(task)) .setTs(task.getTs()) .setAttempt(0) .build()) .build()), null); - log.trace("[{}][{}][{}] Submitted task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType()); - } - - @SuppressWarnings("unchecked") - private HousekeeperTaskProcessor getTaskProcessor(HousekeeperTaskType taskType) { - return Optional.ofNullable((HousekeeperTaskProcessor) taskProcessors.get(taskType)) - .orElseThrow(() -> new IllegalArgumentException("Unsupported task type " + taskType)); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java index 2d2c3242f9..44b2728089 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java @@ -28,7 +28,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; -import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; @@ -36,7 +37,9 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.PreDestroy; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -90,11 +93,15 @@ public class HousekeeperReprocessingService { } for (TbProtoQueueMsg msg : msgs) { + log.trace("Reprocessing task: {}", msg); try { housekeeperService.processTask(msg); - } catch (Exception e) { + } catch (InterruptedException e) { + return; + } catch (Throwable e) { log.error("Unexpected error during message reprocessing [{}]", msg, e); submitForReprocessing(msg, e); + // fixme: msgs are duplicated } } consumer.commit(); @@ -118,12 +125,15 @@ public class HousekeeperReprocessingService { public void submitForReprocessing(TbProtoQueueMsg queueMsg, Throwable error) { ToHousekeeperServiceMsg msg = queueMsg.getValue(); HousekeeperTaskProto task = msg.getTask(); + int attempt = task.getAttempt() + 1; + Set errors = new LinkedHashSet<>(task.getErrorsList()); + errors.add(StringUtils.truncate(ExceptionUtils.getStackTrace(error), 1024)); msg = msg.toBuilder() .setTask(task.toBuilder() .setAttempt(attempt) - .addErrors(StringUtils.truncate(ExceptionUtils.getStackTrace(error), 1024)) - .setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(reprocessingDelay)) + .clearErrors().addAllErrors(errors) + .setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8))) .build()) .build(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 7af241d41e..f59e3c86b7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1403,6 +1403,9 @@ queue: tb_housekeeper: - key: max.poll.records value: "1" + tb_housekeeper.reprocessing: + - key: max.poll.records + value: "1" other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms @@ -1425,6 +1428,7 @@ queue: # Kafka properties for Version Control topic version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" housekeeper: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1" + housekeeper-reprocessing: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationType.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationType.java index cfc91d8fca..5a8fd87933 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationType.java @@ -31,4 +31,5 @@ public enum NotificationType { RATE_LIMITS, EDGE_CONNECTION, EDGE_COMMUNICATION_FAILURE + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 73cea54a47..8e757a3b14 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1424,7 +1424,7 @@ message ToHousekeeperServiceMsg { } message HousekeeperTaskProto { - bytes value = 1; + string value = 1; int64 ts = 2; // reprocessing diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index d3a5eb821d..915f61c8b4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -45,6 +45,8 @@ public class TbKafkaTopicConfigs { private String vcProperties; @Value("${queue.kafka.topic-properties.housekeeper:}") private String housekeeperProperties; + @Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}") + private String housekeeperReprocessingProperties; @Getter private Map coreConfigs; @@ -66,6 +68,8 @@ public class TbKafkaTopicConfigs { private Map vcConfigs; @Getter private Map housekeeperConfigs; + @Getter + private Map housekeeperReprocessingConfigs; @PostConstruct private void init() { @@ -81,6 +85,7 @@ public class TbKafkaTopicConfigs { fwUpdatesConfigs = PropertyUtils.getProps(fwUpdatesProperties); vcConfigs = PropertyUtils.getProps(vcProperties); housekeeperConfigs = PropertyUtils.getProps(housekeeperProperties); + housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index c8a11e1f32..95bf2d1195 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -84,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin vcAdmin; private final TbQueueAdmin housekeeperAdmin; + private final TbQueueAdmin housekeeperReprocessingAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -118,6 +119,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs()); this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); + this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs()); } @Override @@ -378,7 +380,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi .settings(kafkaSettings) .clientId("monolith-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic())) - .admin(housekeeperAdmin) + .admin(housekeeperReprocessingAdmin) .build(); } @@ -390,7 +392,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi .clientId("monolith-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("monolith-housekeeper-reprocessing-consumer")) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(housekeeperAdmin) + .admin(housekeeperReprocessingAdmin) .statsService(consumerStatsService) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index f2812564e5..478963d300 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -82,6 +82,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin vcAdmin; private final TbQueueAdmin housekeeperAdmin; + private final TbQueueAdmin housekeeperReprocessingAdmin; public KafkaTbCoreQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -115,6 +116,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs()); this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); + this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs()); } @Override @@ -335,7 +337,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { .settings(kafkaSettings) .clientId("tb-core-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic())) - .admin(housekeeperAdmin) + .admin(housekeeperReprocessingAdmin) .build(); } @@ -347,7 +349,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { .clientId("tb-core-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("tb-core-housekeeper-reprocessing-consumer")) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(housekeeperAdmin) + .admin(housekeeperReprocessingAdmin) .statsService(consumerStatsService) .build(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/AlarmsUnassignHousekeeperTask.java b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/AlarmsUnassignHousekeeperTask.java index aa1858f59f..aea09c093c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/AlarmsUnassignHousekeeperTask.java +++ b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/AlarmsUnassignHousekeeperTask.java @@ -15,13 +15,15 @@ */ package org.thingsboard.server.dao.housekeeper.data; -import lombok.Getter; +import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.User; -@Getter +@Data +@EqualsAndHashCode(callSuper = true) public class AlarmsUnassignHousekeeperTask extends HousekeeperTask { - private final String userTitle; + private String userTitle; protected AlarmsUnassignHousekeeperTask(User user) { super(user.getTenantId(), user.getId(), HousekeeperTaskType.UNASSIGN_ALARMS); diff --git a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/EntitiesDeletionHousekeeperTask.java b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/EntitiesDeletionHousekeeperTask.java index 8f4a8d38b0..7846feb229 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/EntitiesDeletionHousekeeperTask.java +++ b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/EntitiesDeletionHousekeeperTask.java @@ -15,14 +15,16 @@ */ package org.thingsboard.server.dao.housekeeper.data; -import lombok.Getter; +import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.TenantId; -@Getter +@Data +@EqualsAndHashCode(callSuper = true) public class EntitiesDeletionHousekeeperTask extends HousekeeperTask { - private final EntityType entityType; + private EntityType entityType; protected EntitiesDeletionHousekeeperTask(TenantId tenantId, EntityType entityType) { super(tenantId, tenantId, HousekeeperTaskType.DELETE_ENTITIES); diff --git a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/HousekeeperTask.java b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/HousekeeperTask.java index f9389e4566..6b63d43d43 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/HousekeeperTask.java +++ b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/data/HousekeeperTask.java @@ -15,7 +15,13 @@ */ package org.thingsboard.server.dao.housekeeper.data; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.AccessLevel; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.NonNull; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; @@ -24,13 +30,20 @@ import org.thingsboard.server.common.data.id.TenantId; import java.io.Serializable; +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "taskType", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY, defaultImpl = HousekeeperTask.class) +@JsonSubTypes({ + @Type(name = "DELETE_ENTITIES", value = EntitiesDeletionHousekeeperTask.class), + @Type(name = "UNASSIGN_ALARMS", value = AlarmsUnassignHousekeeperTask.class) +}) @Data +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class HousekeeperTask implements Serializable { - private final TenantId tenantId; - private final EntityId entityId; - private final HousekeeperTaskType taskType; - private final long ts; + private TenantId tenantId; + private EntityId entityId; + private HousekeeperTaskType taskType; + private long ts; protected HousekeeperTask(@NonNull TenantId tenantId, @NonNull EntityId entityId, @NonNull HousekeeperTaskType taskType) { this.tenantId = tenantId;