Housekeeper tasks encoded to json

This commit is contained in:
ViacheslavKlimov 2024-02-21 18:49:15 +02:00
parent 69ead8cc47
commit b72af4ead9
11 changed files with 73 additions and 32 deletions

View File

@ -15,11 +15,11 @@
*/
package org.thingsboard.server.service.housekeeper;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.housekeeper.HousekeeperService;
@ -40,7 +40,6 @@ import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProce
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -95,8 +94,11 @@ public class DefaultHousekeeperService implements HousekeeperService {
}
for (TbProtoQueueMsg<ToHousekeeperServiceMsg> msg : msgs) {
log.trace("Processing task: {}", msg);
try {
processTask(msg);
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
log.error("Unexpected error during message processing [{}]", msg, e);
reprocessingService.submitForReprocessing(msg, e);
@ -121,16 +123,20 @@ public class DefaultHousekeeperService implements HousekeeperService {
@SuppressWarnings("unchecked")
protected <T extends HousekeeperTask> void processTask(TbProtoQueueMsg<ToHousekeeperServiceMsg> queueMsg) throws Exception {
ToHousekeeperServiceMsg msg = queueMsg.getValue();
HousekeeperTask task = dataDecodingEncodingService.<HousekeeperTask>decode(msg.getTask().getValue().toByteArray()).get();
HousekeeperTaskProcessor<T> taskProcessor = getTaskProcessor(task.getTaskType());
HousekeeperTask task = JacksonUtil.fromString(msg.getTask().getValue(), HousekeeperTask.class);
HousekeeperTaskProcessor<T> taskProcessor = (HousekeeperTaskProcessor<T>) taskProcessors.get(task.getTaskType());
if (taskProcessor == null) {
throw new IllegalArgumentException("Unsupported task type " + task.getTaskType());
}
log.info("[{}][{}][{}] Processing task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task);
try {
Future<Object> future = executor.submit(() -> {
taskProcessor.process((T) task);
return null;
});
future.get(taskProcessingTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
Throwable error = e;
if (e instanceof ExecutionException) {
@ -152,21 +158,15 @@ public class DefaultHousekeeperService implements HousekeeperService {
@Override
public void submitTask(UUID key, HousekeeperTask task) {
log.trace("[{}][{}][{}] Submitting task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType());
TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build();
producer.send(tpi, new TbProtoQueueMsg<>(key, ToHousekeeperServiceMsg.newBuilder()
.setTask(HousekeeperTaskProto.newBuilder()
.setValue(ByteString.copyFrom(dataDecodingEncodingService.encode(task)))
.setValue(JacksonUtil.toString(task))
.setTs(task.getTs())
.setAttempt(0)
.build())
.build()), null);
log.trace("[{}][{}][{}] Submitted task: {}", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getTaskType());
}
@SuppressWarnings("unchecked")
private <T extends HousekeeperTask> HousekeeperTaskProcessor<T> getTaskProcessor(HousekeeperTaskType taskType) {
return Optional.ofNullable((HousekeeperTaskProcessor<T>) taskProcessors.get(taskType))
.orElseThrow(() -> new IllegalArgumentException("Unsupported task type " + taskType));
}
@PreDestroy

View File

@ -28,7 +28,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
@ -36,7 +37,9 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.PreDestroy;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -90,11 +93,15 @@ public class HousekeeperReprocessingService {
}
for (TbProtoQueueMsg<ToHousekeeperServiceMsg> msg : msgs) {
log.trace("Reprocessing task: {}", msg);
try {
housekeeperService.processTask(msg);
} catch (Exception e) {
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
log.error("Unexpected error during message reprocessing [{}]", msg, e);
submitForReprocessing(msg, e);
// fixme: msgs are duplicated
}
}
consumer.commit();
@ -118,12 +125,15 @@ public class HousekeeperReprocessingService {
public void submitForReprocessing(TbProtoQueueMsg<ToHousekeeperServiceMsg> queueMsg, Throwable error) {
ToHousekeeperServiceMsg msg = queueMsg.getValue();
HousekeeperTaskProto task = msg.getTask();
int attempt = task.getAttempt() + 1;
Set<String> errors = new LinkedHashSet<>(task.getErrorsList());
errors.add(StringUtils.truncate(ExceptionUtils.getStackTrace(error), 1024));
msg = msg.toBuilder()
.setTask(task.toBuilder()
.setAttempt(attempt)
.addErrors(StringUtils.truncate(ExceptionUtils.getStackTrace(error), 1024))
.setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(reprocessingDelay))
.clearErrors().addAllErrors(errors)
.setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8)))
.build())
.build();

View File

@ -1403,6 +1403,9 @@ queue:
tb_housekeeper:
- key: max.poll.records
value: "1"
tb_housekeeper.reprocessing:
- key: max.poll.records
value: "1"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
@ -1425,6 +1428,7 @@ queue:
# Kafka properties for Version Control topic
version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
housekeeper: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1"
housekeeper-reprocessing: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"

View File

@ -31,4 +31,5 @@ public enum NotificationType {
RATE_LIMITS,
EDGE_CONNECTION,
EDGE_COMMUNICATION_FAILURE
}

View File

@ -1424,7 +1424,7 @@ message ToHousekeeperServiceMsg {
}
message HousekeeperTaskProto {
bytes value = 1;
string value = 1;
int64 ts = 2;
// reprocessing

View File

@ -45,6 +45,8 @@ public class TbKafkaTopicConfigs {
private String vcProperties;
@Value("${queue.kafka.topic-properties.housekeeper:}")
private String housekeeperProperties;
@Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}")
private String housekeeperReprocessingProperties;
@Getter
private Map<String, String> coreConfigs;
@ -66,6 +68,8 @@ public class TbKafkaTopicConfigs {
private Map<String, String> vcConfigs;
@Getter
private Map<String, String> housekeeperConfigs;
@Getter
private Map<String, String> housekeeperReprocessingConfigs;
@PostConstruct
private void init() {
@ -81,6 +85,7 @@ public class TbKafkaTopicConfigs {
fwUpdatesConfigs = PropertyUtils.getProps(fwUpdatesProperties);
vcConfigs = PropertyUtils.getProps(vcProperties);
housekeeperConfigs = PropertyUtils.getProps(housekeeperProperties);
housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties);
}
}

View File

@ -84,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin fwUpdatesAdmin;
private final TbQueueAdmin vcAdmin;
private final TbQueueAdmin housekeeperAdmin;
private final TbQueueAdmin housekeeperReprocessingAdmin;
private final AtomicLong consumerCount = new AtomicLong();
@ -118,6 +119,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
}
@Override
@ -378,7 +380,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
.settings(kafkaSettings)
.clientId("monolith-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic()))
.admin(housekeeperAdmin)
.admin(housekeeperReprocessingAdmin)
.build();
}
@ -390,7 +392,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
.clientId("monolith-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName("monolith-housekeeper-reprocessing-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(housekeeperAdmin)
.admin(housekeeperReprocessingAdmin)
.statsService(consumerStatsService)
.build();
}

View File

@ -82,6 +82,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueAdmin fwUpdatesAdmin;
private final TbQueueAdmin vcAdmin;
private final TbQueueAdmin housekeeperAdmin;
private final TbQueueAdmin housekeeperReprocessingAdmin;
public KafkaTbCoreQueueFactory(TopicService topicService,
TbKafkaSettings kafkaSettings,
@ -115,6 +116,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
}
@Override
@ -335,7 +337,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
.settings(kafkaSettings)
.clientId("tb-core-housekeeper-reprocessing-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(coreSettings.getHousekeeperReprocessingTopic()))
.admin(housekeeperAdmin)
.admin(housekeeperReprocessingAdmin)
.build();
}
@ -347,7 +349,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
.clientId("tb-core-housekeeper-reprocessing-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName("tb-core-housekeeper-reprocessing-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(housekeeperAdmin)
.admin(housekeeperReprocessingAdmin)
.statsService(consumerStatsService)
.build();
}

View File

@ -15,13 +15,15 @@
*/
package org.thingsboard.server.dao.housekeeper.data;
import lombok.Getter;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.User;
@Getter
@Data
@EqualsAndHashCode(callSuper = true)
public class AlarmsUnassignHousekeeperTask extends HousekeeperTask {
private final String userTitle;
private String userTitle;
protected AlarmsUnassignHousekeeperTask(User user) {
super(user.getTenantId(), user.getId(), HousekeeperTaskType.UNASSIGN_ALARMS);

View File

@ -15,14 +15,16 @@
*/
package org.thingsboard.server.dao.housekeeper.data;
import lombok.Getter;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.TenantId;
@Getter
@Data
@EqualsAndHashCode(callSuper = true)
public class EntitiesDeletionHousekeeperTask extends HousekeeperTask {
private final EntityType entityType;
private EntityType entityType;
protected EntitiesDeletionHousekeeperTask(TenantId tenantId, EntityType entityType) {
super(tenantId, tenantId, HousekeeperTaskType.DELETE_ENTITIES);

View File

@ -15,7 +15,13 @@
*/
package org.thingsboard.server.dao.housekeeper.data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
@ -24,13 +30,20 @@ import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "taskType", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY, defaultImpl = HousekeeperTask.class)
@JsonSubTypes({
@Type(name = "DELETE_ENTITIES", value = EntitiesDeletionHousekeeperTask.class),
@Type(name = "UNASSIGN_ALARMS", value = AlarmsUnassignHousekeeperTask.class)
})
@Data
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class HousekeeperTask implements Serializable {
private final TenantId tenantId;
private final EntityId entityId;
private final HousekeeperTaskType taskType;
private final long ts;
private TenantId tenantId;
private EntityId entityId;
private HousekeeperTaskType taskType;
private long ts;
protected HousekeeperTask(@NonNull TenantId tenantId, @NonNull EntityId entityId, @NonNull HousekeeperTaskType taskType) {
this.tenantId = tenantId;