diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 61f7abb297..c45c4c2ef0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -96,8 +96,13 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldStateRestoreMsg msg) { - log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), msg.getId().cfId()); - states.put(msg.getId().cfId(), msg.getState()); + CalculatedFieldId cfId = msg.getId().cfId(); + log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); + if (msg.getState() != null) { + states.put(cfId, msg.getState()); + } else { + states.remove(cfId); + } } public void process(EntityInitCalculatedFieldMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java new file mode 100644 index 0000000000..7df20bc139 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -0,0 +1,173 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicKvEntry; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.util.KvProtoUtil; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; + +import java.util.Optional; +import java.util.TreeMap; +import java.util.UUID; + +public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { + + @Autowired + private ActorSystemContext actorSystemContext; + + @Override + public final void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + CalculatedFieldStateMsgProto stateMsg = toProto(stateId, state); + long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes(); + if (maxStateSizeInKBytes <= 0 || stateMsg.getSerializedSize() <= maxStateSizeInKBytes) { + doPersist(stateId, stateMsg, callback); + } + } + + protected abstract void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback); + + @Override + public final void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback) { + doRemove(stateId, callback); + } + + protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback); + + protected void processRestoredState(CalculatedFieldStateMsgProto stateMsg) { + CalculatedFieldEntityCtxId stateId = fromProto(stateMsg.getId()); + CalculatedFieldState state = stateMsg.hasState() ? fromProto(stateMsg.getState()) : null; + actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(stateId, state)); + } + + protected CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { + return CalculatedFieldEntityCtxIdProto.newBuilder() + .setTenantIdMSB(ctxId.tenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(ctxId.tenantId().getId().getLeastSignificantBits()) + .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) + .setEntityType(ctxId.entityId().getEntityType().name()) + .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) + .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) + .build(); + } + + protected CalculatedFieldEntityCtxId fromProto(CalculatedFieldEntityCtxIdProto ctxIdProto) { + TenantId tenantId = TenantId.fromUUID(new UUID(ctxIdProto.getTenantIdMSB(), ctxIdProto.getTenantIdLSB())); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); + return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId); + } + + protected CalculatedFieldStateMsgProto toProto(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state) { + var stateProto = CalculatedFieldStateProto.newBuilder() + .setType(state.getType().name()); + state.getArguments().forEach((argName, argEntry) -> { + if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { + stateProto.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry)); + } else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) { + stateProto.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry)); + } + }); + return CalculatedFieldStateMsgProto.newBuilder() + .setId(toProto(stateId)) + .setState(stateProto) + .build(); + } + + protected TransportProtos.SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) { + TransportProtos.SingleValueArgumentProto.Builder builder = TransportProtos.SingleValueArgumentProto.newBuilder() + .setArgName(argName); + if (entry != SingleValueArgumentEntry.EMPTY) { + builder.setValue(KvProtoUtil.toTsValueProto(entry.getTs(), entry.getKvEntryValue())); + } + Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion); + return builder.build(); + } + + protected TransportProtos.TsValueListProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) { + TransportProtos.TsValueListProto.Builder builder = TransportProtos.TsValueListProto.newBuilder().setKey(argName); + if (entry != TsRollingArgumentEntry.EMPTY) { + entry.getTsRecords().forEach((ts, value) -> builder.addTsValue(KvProtoUtil.toTsValueProto(ts, value))); + } + return builder.build(); + } + + protected CalculatedFieldState fromProto(CalculatedFieldStateProto proto) { + if (StringUtils.isEmpty(proto.getType())) { + return null; + } + + CalculatedFieldType type = CalculatedFieldType.valueOf(proto.getType()); + + CalculatedFieldState state = switch (type) { + case SIMPLE -> new SimpleCalculatedFieldState(); + case SCRIPT -> new ScriptCalculatedFieldState(); + }; + + proto.getSingleValueArgumentsList().forEach(argProto -> + state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto))); + + if (CalculatedFieldType.SCRIPT.equals(type)) { + proto.getRollingValueArgumentsList().forEach(argProto -> + state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto))); + } + + return state; + } + + protected SingleValueArgumentEntry fromSingleValueArgumentProto(TransportProtos.SingleValueArgumentProto proto) { + if (!proto.hasValue()) { + return (SingleValueArgumentEntry) SingleValueArgumentEntry.EMPTY; + } + TransportProtos.TsValueProto tsValueProto = proto.getValue(); + long ts = tsValueProto.getTs(); + BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto); + return new SingleValueArgumentEntry(ts, kvEntry, proto.getVersion()); + } + + protected TsRollingArgumentEntry fromRollingArgumentProto(TransportProtos.TsValueListProto proto) { + if (proto.getTsValueCount() <= 0) { + return (TsRollingArgumentEntry) TsRollingArgumentEntry.EMPTY; + } + TreeMap tsRecords = new TreeMap<>(); + proto.getTsValueList().forEach(tsValueProto -> { + BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getKey(), tsValueProto); + tsRecords.put(tsValueProto.getTs(), kvEntry); + }); + return new TsRollingArgumentEntry(tsRecords); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index 37211c66c5..05f9dab36b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -16,14 +16,19 @@ package org.thingsboard.server.service.cf; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; +import java.util.Set; + public interface CalculatedFieldStateService { void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); + void restore(Set partitions); + } diff --git a/application/src/main/java/org/thingsboard/server/utils/RocksDBConfig.java b/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java similarity index 58% rename from application/src/main/java/org/thingsboard/server/utils/RocksDBConfig.java rename to application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java index 9c3c02f472..cc63d757a3 100644 --- a/application/src/main/java/org/thingsboard/server/utils/RocksDBConfig.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java @@ -13,40 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.utils; +package org.thingsboard.server.service.cf; import jakarta.annotation.PreDestroy; import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.thingsboard.server.utils.TbRocksDb; @Component -public class RocksDBConfig { +public class CfRocksDb extends TbRocksDb { - @Value("${rocksdb.db_path:${java.io.tmpdir}/rocksdb}") - private String dbPath; - private RocksDB db; - - static { - RocksDB.loadLibrary(); - } - - public RocksDB getDb() throws RocksDBException { - if (db == null) { - Options options = new Options().setCreateIfMissing(true); - db = RocksDB.open(options, dbPath); - } - return db; + public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) throws Exception { + super(path, new Options().setCreateIfMissing(true), new WriteOptions().setSync(true)); } @PreDestroy + @Override public void close() { - if (db != null) { - db.close(); - db = null; - } + super.close(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 6b47053f2d..f2c6976cbf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -70,6 +70,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; @@ -91,7 +92,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; -import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; @TbRuleEngineComponent @Service @@ -188,7 +188,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP if (broadcast) { broadcasts.add(link); } else { - TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, link.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId()); unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java b/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java deleted file mode 100644 index 7181cc43ed..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf; - -import lombok.extern.slf4j.Slf4j; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.WriteOptions; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; -import org.thingsboard.server.utils.RocksDBConfig; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; - -@Service -@Slf4j -@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) -public class RocksDBService { - - private final RocksDB db; - private final WriteOptions writeOptions; - - public RocksDBService(RocksDBConfig config) throws RocksDBException { - this.db = config.getDb(); - this.writeOptions = new WriteOptions().setSync(true); - } - - public void put(String key, String value) { - try { - db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); - } catch (RocksDBException e) { - log.error("Failed to store data to RocksDB", e); - } - } - - public void put(CalculatedFieldEntityCtxIdProto key, CalculatedFieldStateProto value) { - try { - db.put(writeOptions, key.toByteArray(), value.toByteArray()); - } catch (RocksDBException e) { - log.error("Failed to store data to RocksDB", e); - } - } - - public void delete(CalculatedFieldEntityCtxIdProto key) { - try { - db.delete(writeOptions, key.toByteArray()); - } catch (RocksDBException e) { - log.error("Failed to delete data from RocksDB", e); - } - } - - public String get(String key) { - try { - byte[] value = db.get(key.getBytes(StandardCharsets.UTF_8)); - return value != null ? new String(value, StandardCharsets.UTF_8) : null; - } catch (RocksDBException e) { - log.error("Failed to retrieve data from RocksDB", e); - return null; - } - } - - public Map getAll() { - Map results = new HashMap<>(); - try (RocksIterator iterator = db.newIterator()) { - for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { - try { - CalculatedFieldEntityCtxIdProto key = CalculatedFieldEntityCtxIdProto.parseFrom(iterator.key()); - CalculatedFieldStateProto value = CalculatedFieldStateProto.parseFrom(iterator.value()); - results.put(key, value); - } catch (Exception e) { - log.error("Failed to retrieve data from RocksDB", e); - } - } - } - return results; - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java index 9a56d35ac4..b2bb2287fa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java @@ -21,8 +21,8 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.queue.discovery.HashPartitionService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.TbRuleEngineComponent; @@ -30,7 +30,6 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -49,7 +48,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { - myPartitions = event.getCalculatedFieldsPartitions().stream() + myPartitions = event.getCfPartitions().stream() .filter(TopicPartitionInfo::isMyPartition) .map(tpi -> tpi.getPartition().orElse(UNKNOWN)).collect(Collectors.toList()); //Naive approach that need to be improved. @@ -58,7 +57,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { - var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId); + var tpi = partitionService.resolve(QueueKey.CF, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()) .add(profileId, entityId, partition, tpi.isMyPartition()); @@ -66,7 +65,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { - var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId); + var tpi = partitionService.resolve(QueueKey.CF, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); //TODO: make this method atomic; @@ -87,7 +86,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { - var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId); + var tpi = partitionService.resolve(QueueKey.CF, entityId); return tpi.getPartition().orElse(UNKNOWN); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java index 6ce0a11ade..a09a5cd035 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java @@ -20,4 +20,9 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; public record CalculatedFieldEntityCtxId(TenantId tenantId, CalculatedFieldId cfId, EntityId entityId) { + + public String toKey() { + return cfId + "_" + entityId; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index a1999d438c..152b1affd9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -15,31 +15,140 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.queue.util.AfterStartUp; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; +import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; +import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.CalculatedFieldStateService; +import org.thingsboard.server.service.queue.DefaultTbCalculatedFieldConsumerService.CalculatedFieldQueueConfig; +import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; @Service @RequiredArgsConstructor -@ConditionalOnExpression("'${zk.enabled:false}'=='true' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine')") -public class KafkaCalculatedFieldStateService implements CalculatedFieldStateService { +@Slf4j +@ConditionalOnExpression("'${queue.type:null}'=='kafka'") +public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldStateService { - @AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE) - public void initCalculatedFieldStates() { + private final TbRuleEngineQueueFactory queueFactory; + private final PartitionService partitionService; + + @Value("${queue.calculated_fields.poll_interval:25}") + private long pollInterval; + @Value("${queue.calculated_fields.consumer_per_partition:true}") + private boolean consumerPerPartition; + + private MainQueueConsumerManager, CalculatedFieldQueueConfig> stateConsumer; + private TbKafkaProducerTemplate> stateProducer; + + protected ExecutorService consumersExecutor; + protected ExecutorService mgmtExecutor; + protected ScheduledExecutorService scheduler; + + private final AtomicInteger counter = new AtomicInteger(); + + @PostConstruct + private void init() { + this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cf-state-consumer")); + this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(Math.max(Runtime.getRuntime().availableProcessors(), 4), "cf-state-mgmt"); + this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("cf-state-consumer-scheduler"); + + this.stateConsumer = MainQueueConsumerManager., CalculatedFieldQueueConfig>builder() + .queueKey(QueueKey.CF_STATES) + .config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval)) + .msgPackProcessor((msgs, consumer, config) -> { + for (TbProtoQueueMsg msg : msgs) { + try { + processRestoredState(msg.getValue()); + } catch (Throwable t) { + log.error("Failed to process state message: {}", msg, t); + } + + int processedMsgCount = counter.incrementAndGet(); + if (processedMsgCount % 10000 == 0) { + log.info("Processed {} calculated field state msgs", processedMsgCount); + } + } + }) + .consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer()) + .consumerExecutor(consumersExecutor) + .scheduler(scheduler) + .taskExecutor(mgmtExecutor) + .build(); + this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } @Override - public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - callback.onSuccess(); + protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback) { + TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId()); + stateProducer.send(tpi, stateId.toKey(), new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto), new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + if (callback != null) { + callback.onSuccess(); + } + } + + @Override + public void onFailure(Throwable t) { + if (callback != null) { + callback.onFailure(t); + } + } + }); } @Override - public void removeState(CalculatedFieldEntityCtxId ctxId, TbCallback callback) { - callback.onSuccess(); + protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) { + doPersist(stateId, CalculatedFieldStateMsgProto.newBuilder() + .setId(toProto(stateId)) + .build(), callback); + } + + @Override + public void restore(Set partitions) { + partitions = partitions.stream().map(tpi -> tpi.newByTopic(partitionService.getTopic(QueueKey.CF_STATES))).collect(Collectors.toSet()); + log.info("Restoring calculated field states for partitions: {}", partitions.stream().map(TopicPartitionInfo::getFullTopicName).toList()); + long startTs = System.currentTimeMillis(); + counter.set(0); + + stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update + stateConsumer.awaitStop(0);// consumers should stop on their own because stopWhenRead is true, we just need to wait + + log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs); + } + + @PreDestroy + private void preDestroy() { + stateConsumer.stop(); + stateConsumer.awaitStop(); + stateProducer.stop(); + + consumersExecutor.shutdownNow(); + mgmtExecutor.shutdownNow(); + scheduler.shutdownNow(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 3374eb2660..eb7f4818d1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -15,174 +15,57 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import com.google.protobuf.InvalidProtocolBufferException; import lombok.RequiredArgsConstructor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; -import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.cf.CalculatedFieldType; -import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.util.KvProtoUtil; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; -import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsValueListProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto; -import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.service.cf.RocksDBService; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; +import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; +import org.thingsboard.server.service.cf.CfRocksDb; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.CalculatedFieldStateService; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.UUID; -import java.util.stream.Collectors; +import java.util.Set; @Service @RequiredArgsConstructor -@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) // Queue type in mem or Kafka; -public class RocksDBCalculatedFieldStateService implements CalculatedFieldStateService { +@Slf4j +@ConditionalOnExpression("'${queue.type:null}'=='in-memory'") +public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldStateService { - private final ActorSystemContext actorSystemContext; - private final RocksDBService rocksDBService; + private final CfRocksDb cfRocksDb; - public Map restoreStates() { - return rocksDBService.getAll().entrySet().stream() - .collect(Collectors.toMap( - entry -> fromProto(entry.getKey()), - entry -> fromProto(entry.getValue()) - )); - } + private Set partitions; - @AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE) - public void initCalculatedFieldStates() { - restoreStates().forEach((k, v) -> actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(k, v))); + @Override + protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback) { + cfRocksDb.put(stateId.toKey(), stateMsgProto.toByteArray()); + callback.onSuccess(); } @Override - public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - CalculatedFieldStateProto stateProto = toProto(stateId, state); - long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes(); - if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= maxStateSizeInKBytes) { - rocksDBService.put(toProto(stateId), stateProto); + protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) { + cfRocksDb.delete(stateId.toKey()); + callback.onSuccess(); + } + + @Override + public void restore(Set partitions) { + if (this.partitions == null) { + this.partitions = partitions; + } else { + return; } - callback.onSuccess(); - } - @Override - public void removeState(CalculatedFieldEntityCtxId ctxId, TbCallback callback) { - rocksDBService.delete(toProto(ctxId)); - callback.onSuccess(); - } - - private CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { - return CalculatedFieldEntityCtxIdProto.newBuilder() - .setTenantIdMSB(ctxId.tenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(ctxId.tenantId().getId().getLeastSignificantBits()) - .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) - .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) - .setEntityType(ctxId.entityId().getEntityType().name()) - .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) - .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) - .build(); - } - - private CalculatedFieldEntityCtxId fromProto(CalculatedFieldEntityCtxIdProto ctxIdProto) { - TenantId tenantId = TenantId.fromUUID(new UUID(ctxIdProto.getTenantIdMSB(), ctxIdProto.getTenantIdLSB())); - EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); - CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); - return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId); - } - - private CalculatedFieldStateProto toProto(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state) { - CalculatedFieldStateProto.Builder builder = CalculatedFieldStateProto.newBuilder() - .setId(toProto(stateId)) - .setType(state.getType().name()); - - state.getArguments().forEach((argName, argEntry) -> { - if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry)); - } else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) { - builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry)); + cfRocksDb.forEach((key, value) -> { + try { + processRestoredState(CalculatedFieldStateMsgProto.parseFrom(value)); + } catch (InvalidProtocolBufferException e) { + log.error("[{}] Failed to process restored state", key, e); } }); - - return builder.build(); - } - - private SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) { - SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder() - .setArgName(argName); - - if (entry != SingleValueArgumentEntry.EMPTY) { - builder.setValue(KvProtoUtil.toTsValueProto(entry.getTs(), entry.getKvEntryValue())); - } - - Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion); - - return builder.build(); - } - - private TsValueListProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) { - TsValueListProto.Builder builder = TsValueListProto.newBuilder().setKey(argName); - - if (entry != TsRollingArgumentEntry.EMPTY) { - entry.getTsRecords().forEach((ts, value) -> builder.addTsValue(KvProtoUtil.toTsValueProto(ts, value))); - } - - return builder.build(); - } - - private CalculatedFieldState fromProto(CalculatedFieldStateProto proto) { - if (StringUtils.isEmpty(proto.getType())) { - return null; - } - - CalculatedFieldType type = CalculatedFieldType.valueOf(proto.getType()); - - CalculatedFieldState state = switch (type) { - case SIMPLE -> new SimpleCalculatedFieldState(); - case SCRIPT -> new ScriptCalculatedFieldState(); - }; - - proto.getSingleValueArgumentsList().forEach(argProto -> - state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto))); - - if (CalculatedFieldType.SCRIPT.equals(type)) { - proto.getRollingValueArgumentsList().forEach(argProto -> - state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto))); - } - - return state; - } - - private SingleValueArgumentEntry fromSingleValueArgumentProto(SingleValueArgumentProto proto) { - if (!proto.hasValue()) { - return (SingleValueArgumentEntry) SingleValueArgumentEntry.EMPTY; - } - TsValueProto tsValueProto = proto.getValue(); - long ts = tsValueProto.getTs(); - BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto); - return new SingleValueArgumentEntry(ts, kvEntry, proto.getVersion()); - } - - private TsRollingArgumentEntry fromRollingArgumentProto(TsValueListProto proto) { - if (proto.getTsValueCount() <= 0) { - return (TsRollingArgumentEntry) TsRollingArgumentEntry.EMPTY; - } - TreeMap tsRecords = new TreeMap<>(); - proto.getTsValueList().forEach(tsValueProto -> { - BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getKey(), tsValueProto); - tsRecords.put(tsValueProto.getTs(), kvEntry); - }); - return new TsRollingArgumentEntry(tsRecords); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index f3e976084b..82ee4de7dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -21,11 +21,11 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; @@ -52,6 +52,7 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldCache; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager; @@ -65,6 +66,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -86,10 +89,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private int poolSize; private final TbRuleEngineQueueFactory queueFactory; + private final CalculatedFieldStateService stateService; private MainQueueConsumerManager, CalculatedFieldQueueConfig> mainConsumer; - private volatile ListeningExecutorService calculatedFieldsExecutor; + private ListeningExecutorService calculatedFieldsExecutor; + private ExecutorService repartitionExecutor; public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, @@ -100,19 +105,22 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer PartitionService partitionService, ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, - CalculatedFieldCache calculatedFieldCache) { + CalculatedFieldCache calculatedFieldCache, + CalculatedFieldStateService stateService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; + this.stateService = stateService; } @PostConstruct public void init() { super.init("tb-cf"); this.calculatedFieldsExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(poolSize, "tb-cf-executor")); // TODO: multiple threads. + this.repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-cf-repartition")); this.mainConsumer = MainQueueConsumerManager., CalculatedFieldQueueConfig>builder() - .queueKey(new QueueKey(ServiceType.TB_RULE_ENGINE)) + .queueKey(QueueKey.CF) .config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval)) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) @@ -137,20 +145,23 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { - var partitions = event.getCalculatedFieldsPartitions(); - log.info("Subscribing to partitions: {}", partitions); - // TODO: @vklimov - before update of the main consumer, we should read the state topics and use - // CalculatedFieldStateService (KafkaCalculatedFieldStateService) to restore the states for entities that belong to new partitions. - // Cleanup entities that do not belong to current partition; - mainConsumer.update(event.getCalculatedFieldsPartitions()); - // Cleanup old entities after corresponding consumers are stopped. - // Any periodic tasks need to check that the entity is still managed by the current server before processing. - actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + var partitions = event.getCfPartitions(); + repartitionExecutor.submit(() -> { + try { + stateService.restore(partitions); + mainConsumer.update(partitions); + // Cleanup old entities after corresponding consumers are stopped. + // Any periodic tasks need to check that the entity is still managed by the current server before processing. + actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + } catch (Throwable t) { + log.error("Failed to process partition change event: {}", event, t); + } + }); } private boolean[] partitionsToBooleanIndexArray(Set partitions) { boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()]; - for(var tpi : partitions) { + for (var tpi : partitions) { tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true); } return myPartitions; @@ -193,9 +204,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer packSubmitFuture.cancel(true); log.info("Timeout to process message: {}", pendingMsgHolder.getMsg()); } - if (log.isDebugEnabled()) { - ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); - } +// if (log.isDebugEnabled()) { +// ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); +// } + ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); // TODO: replace with commented above after testing ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } consumer.commit(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 48d26895b9..920a7563dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -94,6 +94,7 @@ import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; @@ -111,7 +112,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.thingsboard.server.common.util.ProtoUtils.toProto; -import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; @Service @Slf4j @@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); + TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback); } @@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); + TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); toRuleEngineNfs.incrementAndGet(); } @@ -792,8 +792,7 @@ public class DefaultTbClusterService implements TbClusterService { private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId); switch (action) { - case ASSIGNED_TO_EDGE -> - pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); + case ASSIGNED_TO_EDGE -> pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); case UNASSIGNED_FROM_EDGE -> { EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId); pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 5e07e33a9e..c42f7c490b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rpc.RpcError; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -63,8 +64,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; - @Service @TbRuleEngineComponent @Slf4j @@ -109,7 +108,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { event.getPartitionsMap().forEach((queueKey, partitions) -> { - if (CALCULATED_FIELD_QUEUE_KEY.equals(queueKey)) { + if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) { return; } if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java index 6eb5c94c9b..5fa69695d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java @@ -182,7 +182,7 @@ public class MainQueueConsumerManager partitions) { + public void doUpdate(Set partitions) { this.partitions = partitions; consumerWrapper.updatePartitions(partitions); } @@ -226,7 +226,9 @@ public class MainQueueConsumerManager msgs, TbQueueConsumer consumer, C config) throws Exception { + log.trace("Processing {} messages", msgs.size()); msgPackProcessor.process(msgs, consumer, config); + log.trace("Processed {} messages", msgs.size()); } public void stop() { @@ -236,8 +238,12 @@ public class MainQueueConsumerManager consumerTask.awaitCompletion(timeoutSec)); log.debug("[{}] Unsubscribed and stopped consumers", queueKey); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 5e672eb5c6..0b4e7c02d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -70,13 +70,21 @@ public class TbQueueConsumerTask { } public void awaitCompletion() { + awaitCompletion(30); + } + + public void awaitCompletion(long timeoutSec) { log.trace("[{}] Awaiting finish", key); if (isRunning()) { try { - task.get(30, TimeUnit.SECONDS); + if (timeoutSec > 0) { + task.get(timeoutSec, TimeUnit.SECONDS); + } else { + task.get(); + } log.trace("[{}] Awaited finish", key); } catch (Exception e) { - log.warn("[{}] Failed to await for consumer to stop", key, e); + log.warn("[{}] Failed to await for consumer to stop (timeout {} sec)", key, timeoutSec, e); } task = null; } diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/rest/RestAuthenticationProvider.java b/application/src/main/java/org/thingsboard/server/service/security/auth/rest/RestAuthenticationProvider.java index b9fe54deec..56215b82ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/rest/RestAuthenticationProvider.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/rest/RestAuthenticationProvider.java @@ -106,6 +106,7 @@ public class RestAuthenticationProvider implements AuthenticationProvider { if (twoFactorAuthService.isTwoFaEnabled(securityUser.getTenantId(), securityUser.getId())) { return new MfaAuthenticationToken(securityUser); } else { + systemSecurityService.logLoginAction(securityUser, authentication.getDetails(), ActionType.LOGIN, null); } } else { diff --git a/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java b/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java new file mode 100644 index 0000000000..fa56e0ec1d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.utils; + +import lombok.SneakyThrows; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.function.BiConsumer; + +public class TbRocksDb { + + protected final String path; + private final WriteOptions writeOptions; + protected final RocksDB db; + + static { + RocksDB.loadLibrary(); + } + + public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) throws Exception { + this.path = path; + this.writeOptions = writeOptions; + Files.createDirectories(Path.of(path).getParent()); + this.db = RocksDB.open(dbOptions, path); + } + + @SneakyThrows + public void put(String key, byte[] value) { + db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value); + } + + public void forEach(BiConsumer processor) { + try (RocksIterator iterator = db.newIterator()) { + for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { + String key = new String(iterator.key(), StandardCharsets.UTF_8); + processor.accept(key, iterator.value()); + } + } + } + + @SneakyThrows + public void delete(String key) { + db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8)); + } + + public void close() { + if (db != null) { + db.close(); + } + } + +} \ No newline at end of file diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 197f05a5d5..e2570e9137 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -426,10 +426,6 @@ sql: pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls query_timeout: "${SQL_RELATIONS_QUERY_TIMEOUT_SEC:20}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls -rocksdb: - # Rocksdb path - db_path: "${ROCKS_DB_PATH:${java.io.tmpdir}/rocksdb}" - # Actor system parameters actors: system: @@ -1629,6 +1625,8 @@ queue: edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Calculated Field topics calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for Calculated Field State topics + calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" @@ -1763,6 +1761,8 @@ queue: consumer_per_partition: "${TB_QUEUE_CF_CONSUMER_PER_PARTITION:true}" # Thread pool size for processing of the incoming messages pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}" + # RocksDB path for storing CF states + rocks_db_path: "${TB_QUEUE_CF_ROCKS_DB_PATH:${user.home}/.rocksdb/cf_states}" transport: # For high-priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 77a7c4a781..c84cae40dd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -146,5 +146,6 @@ public class DataConstants { public static final String EDGE_EVENT_QUEUE_NAME = "EdgeEvent"; public static final String CF_QUEUE_NAME = "CalculatedFields"; + public static final String CF_STATES_QUEUE_NAME = "CalculatedFieldStates"; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java index e89f3c1ad8..7f9a611c98 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java @@ -75,4 +75,16 @@ public class CollectionsUtil { return isEmpty(collection) || collection.contains(element); } + public static boolean isOneOf(V value, V... others) { + if (value == null) { + return false; + } + for (V other : others) { + if (value.equals(other)) { + return true; + } + } + return false; + } + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 4a2f3710ae..992c6ac42f 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -814,12 +814,15 @@ message SingleValueArgumentProto { int64 version = 3; } -message CalculatedFieldStateProto { +message CalculatedFieldStateMsgProto { CalculatedFieldEntityCtxIdProto id = 1; - // int32 version = 2; - string type = 3; - repeated SingleValueArgumentProto singleValueArguments = 4; - repeated TsValueListProto rollingValueArguments = 5; + CalculatedFieldStateProto state = 2; +} + +message CalculatedFieldStateProto { + string type = 1; + repeated SingleValueArgumentProto singleValueArguments = 2; + repeated TsValueListProto rollingValueArguments = 3; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 9513565ca1..6a5b86cf54 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -120,9 +120,9 @@ public abstract class AbstractTbQueueConsumerTemplate i if (record != null) { result.add(decode(record)); } - } catch (IOException e) { - log.error("Failed decode record: [{}]", record); - throw new RuntimeException("Failed to decode record: ", e); + } catch (Exception e) { + log.error("Failed to decode record {}", record, e); + throw new RuntimeException("Failed to decode record " + record, e); } }); return result; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index f2f1ccd19c..249dfa859f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -51,7 +51,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import static org.thingsboard.server.common.data.DataConstants.*; +import static org.thingsboard.server.common.data.DataConstants.EDGE_QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; @Service @Slf4j @@ -78,8 +79,6 @@ public class HashPartitionService implements PartitionService { @Value("${queue.partitions.hash_function_name:murmur3_128}") private String hashFunctionName; - public static final QueueKey CALCULATED_FIELD_QUEUE_KEY = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME); - private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; private final TenantRoutingInfoService tenantRoutingInfoService; @@ -120,8 +119,10 @@ public class HashPartitionService implements PartitionService { partitionSizesMap.put(coreKey, corePartitions); partitionTopicsMap.put(coreKey, coreTopic); - partitionSizesMap.put(CALCULATED_FIELD_QUEUE_KEY, cfPartitions); - partitionTopicsMap.put(CALCULATED_FIELD_QUEUE_KEY, cfEventTopic); + partitionSizesMap.put(QueueKey.CF, cfPartitions); + partitionTopicsMap.put(QueueKey.CF, cfEventTopic); + partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions); + partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic); QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR); partitionSizesMap.put(vcKey, vcPartitions); @@ -148,6 +149,11 @@ public class HashPartitionService implements PartitionService { return myPartitions.get(queueKey); } + @Override + public String getTopic(QueueKey queueKey) { + return partitionTopicsMap.get(queueKey); + } + private void doInitRuleEnginePartitions() { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { @@ -222,7 +228,7 @@ public class HashPartitionService implements PartitionService { }); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream() - .collect(Collectors.toMap(k -> k, k -> Collections.emptySet()))); + .collect(Collectors.toMap(k -> k, k -> Collections.emptySet())), Collections.emptyMap()); } } @@ -402,6 +408,7 @@ public class HashPartitionService implements PartitionService { myPartitions = newPartitions; Map> changedPartitionsMap = new HashMap<>(); + Map> oldPartitionsMap = new HashMap<>(); Set removed = new HashSet<>(); oldPartitions.forEach((queueKey, partitions) -> { @@ -422,19 +429,16 @@ public class HashPartitionService implements PartitionService { myPartitions.forEach((queueKey, partitions) -> { if (!partitions.equals(oldPartitions.get(queueKey))) { - Set tpiList = partitions.stream() - .map(partition -> buildTopicPartitionInfo(queueKey, partition)) - .collect(Collectors.toSet()); - changedPartitionsMap.put(queueKey, tpiList); + changedPartitionsMap.put(queueKey, toTpiList(queueKey, partitions)); + oldPartitionsMap.put(queueKey, toTpiList(queueKey, oldPartitions.get(queueKey))); } }); if (!changedPartitionsMap.isEmpty()) { - Map>> partitionsByServiceType = new HashMap<>(); - changedPartitionsMap.forEach((queueKey, partitions) -> { - partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>()) - .put(queueKey, partitions); - }); - partitionsByServiceType.forEach(this::publishPartitionChangeEvent); + changedPartitionsMap.entrySet().stream() + .collect(Collectors.groupingBy(entry -> entry.getKey().getType(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .forEach((serviceType, partitionsMap) -> { + publishPartitionChangeEvent(serviceType, partitionsMap, oldPartitionsMap); + }); } if (currentOtherServices == null) { @@ -466,13 +470,15 @@ public class HashPartitionService implements PartitionService { applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService)); } - private void publishPartitionChangeEvent(ServiceType serviceType, Map> partitionsMap) { - log.info("Partitions changed: {}", System.lineSeparator() + partitionsMap.entrySet().stream() + private void publishPartitionChangeEvent(ServiceType serviceType, + Map> newPartitions, + Map> oldPartitions) { + log.info("Partitions changed: {}", System.lineSeparator() + newPartitions.entrySet().stream() .map(entry -> "[" + entry.getKey() + "] - [" + entry.getValue().stream() .map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted() .collect(Collectors.joining(", ")) + "]") .collect(Collectors.joining(System.lineSeparator()))); - PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap); + PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, newPartitions); try { applicationEventPublisher.publishEvent(event); } catch (Exception e) { @@ -480,6 +486,15 @@ public class HashPartitionService implements PartitionService { } } + private Set toTpiList(QueueKey queueKey, List partitions) { + if (partitions == null) { + return Collections.emptySet(); + } + return partitions.stream() + .map(partition -> buildTopicPartitionInfo(queueKey, partition)) + .collect(Collectors.toSet()); + } + @Override public Set getAllServiceIds(ServiceType serviceType) { return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet()); @@ -508,7 +523,6 @@ public class HashPartitionService implements PartitionService { return result; } - @Override public int resolvePartitionIndex(UUID entityId, int partitions) { int hash = hash(entityId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index b0e1229f97..5bbff7663c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -45,6 +45,8 @@ public interface PartitionService { List getMyPartitions(QueueKey queueKey); + String getTopic(QueueKey queueKey); + /** * Received from the Discovery service when network topology is changed. * @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java index b991e4614c..3f8926ad18 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java @@ -23,6 +23,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; +import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME; + @Data @AllArgsConstructor public class QueueKey { @@ -32,6 +35,9 @@ public class QueueKey { private final String queueName; private final TenantId tenantId; + public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME); + public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME); + public QueueKey(ServiceType type, Queue queue) { this.type = type; this.queueName = queue.getName(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index 57a4941981..a16d33c884 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -28,8 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; - @ToString(callSuper = true) public class PartitionChangeEvent extends TbApplicationEvent { @@ -55,8 +53,8 @@ public class PartitionChangeEvent extends TbApplicationEvent { return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_CORE, DataConstants.EDGE_QUEUE_NAME); } - public Set getCalculatedFieldsPartitions() { - return partitionsMap.getOrDefault(CALCULATED_FIELD_QUEUE_KEY, Collections.emptySet()); + public Set getCfPartitions() { + return partitionsMap.getOrDefault(QueueKey.CF, Collections.emptySet()); } private Set getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaTbQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaTbQueueMsg.java index a2edc35d94..fe2c8b8a90 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaTbQueueMsg.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaTbQueueMsg.java @@ -23,12 +23,19 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; import java.util.UUID; public class KafkaTbQueueMsg implements TbQueueMsg { + + private static final int UUID_LENGTH = 36; + private final UUID key; private final TbQueueMsgHeaders headers; private final byte[] data; public KafkaTbQueueMsg(ConsumerRecord record) { - this.key = UUID.fromString(record.key()); + if (record.key().length() == UUID_LENGTH) { + this.key = UUID.fromString(record.key()); + } else { + this.key = UUID.randomUUID(); + } TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); record.headers().forEach(header -> { headers.put(header.key(), header.value()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index ef79834735..1d39a3bd0c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -18,9 +18,11 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.springframework.util.StopWatch; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueMsg; @@ -29,9 +31,12 @@ import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. @@ -46,10 +51,15 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private final TbKafkaConsumerStatsService statsService; private final String groupId; + private final boolean readFromBeginning; // reset offset to beginning + private final boolean stopWhenRead; // stop consuming when reached initial end offsets + private Map endOffsets; // needed if stopWhenRead is true + @Builder private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, String clientId, String groupId, String topic, - TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) { + TbQueueAdmin admin, TbKafkaConsumerStatsService statsService, + boolean readFromBeginning, boolean stopWhenRead) { super(topic); Properties props = settings.toConsumerProps(topic); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); @@ -67,13 +77,35 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue this.admin = admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; + this.readFromBeginning = readFromBeginning; + this.stopWhenRead = stopWhenRead; } @Override protected void doSubscribe(List topicNames) { if (!topicNames.isEmpty()) { topicNames.forEach(admin::createTopicIfNotExists); - consumer.subscribe(topicNames); + if (readFromBeginning || stopWhenRead) { + consumer.subscribe(topicNames, new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) {} + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.debug("Handling onPartitionsAssigned {}", partitions); + if (readFromBeginning) { + consumer.seekToBeginning(partitions); + } + if (stopWhenRead) { + endOffsets = consumer.endOffsets(partitions).entrySet().stream() + .filter(entry -> entry.getValue() > 0) + .collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue)); + } + } + }); + } else { + consumer.subscribe(topicNames); + } } else { log.info("unsubscribe due to empty topic list"); consumer.unsubscribe(); @@ -92,13 +124,32 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue stopWatch.stop(); log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis()); + List> recordList; if (records.isEmpty()) { - return Collections.emptyList(); + recordList = Collections.emptyList(); } else { - List> recordList = new ArrayList<>(256); - records.forEach(recordList::add); - return recordList; + recordList = new ArrayList<>(256); + records.forEach(record -> { + recordList.add(record); + if (stopWhenRead && endOffsets != null) { + int partition = record.partition(); + Long endOffset = endOffsets.get(partition); + if (endOffset == null) { + log.warn("End offset not found for {} [{}]", record.topic(), partition); + return; + } + log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1); + if (record.offset() >= endOffset - 1) { + endOffsets.remove(partition); + } + } + }); } + if (stopWhenRead && endOffsets != null && endOffsets.isEmpty()) { + log.info("Reached end offset for {}, stopping consumer", consumer.assignment()); + stop(); + } + return recordList; } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 3c9b85e925..de4fc7049d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -97,9 +97,12 @@ public class TbKafkaProducerTemplate implements TbQueuePro @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { + send(tpi, msg.getKey().toString(), msg, callback); + } + + public void send(TopicPartitionInfo tpi, String key, T msg, TbQueueCallback callback) { try { createTopicIfNotExist(tpi); - String key = msg.getKey().toString(); byte[] data = msg.getData(); ProducerRecord record; List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); @@ -116,7 +119,7 @@ public class TbKafkaProducerTemplate implements TbQueuePro if (callback != null) { callback.onFailure(exception); } else { - log.warn("Producer template failure: {}", exception.getMessage(), exception); + log.warn("Producer template failure", exception); } } }); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index cdd0add38b..7213cf34b8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -54,6 +54,8 @@ public class TbKafkaTopicConfigs { private String housekeeperReprocessingProperties; @Value("${queue.kafka.topic-properties.calculated-field:}") private String calculatedFieldProperties; + @Value("${queue.kafka.topic-properties.calculated-field-state:}") + private String calculatedFieldStateProperties; @Getter private Map coreConfigs; @@ -83,6 +85,8 @@ public class TbKafkaTopicConfigs { private Map edgeEventConfigs; @Getter private Map calculatedFieldConfigs; + @Getter + private Map calculatedFieldStateConfigs; @PostConstruct private void init() { @@ -102,6 +106,7 @@ public class TbKafkaTopicConfigs { edgeConfigs = PropertyUtils.getProps(edgeProperties); edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties); calculatedFieldConfigs = PropertyUtils.getProps(calculatedFieldProperties); + calculatedFieldStateConfigs = PropertyUtils.getProps(calculatedFieldStateProperties); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index c26e2d15c9..8e36ae4962 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; @@ -159,12 +160,12 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createCalculatedFieldStateConsumer() { + public TbQueueConsumer> createCalculatedFieldStateConsumer() { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); } @Override - public TbQueueProducer> createCalculatedFieldStateProducer() { + public TbQueueProducer> createCalculatedFieldStateProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 01e174023c..99a50b2f5a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -100,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; private final TbQueueAdmin cfAdmin; + private final TbQueueAdmin cfStateAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -142,6 +143,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs()); + this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs()); } @Override @@ -546,26 +548,28 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createCalculatedFieldStateConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); - consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); - consumerBuilder.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())); - consumerBuilder.admin(cfAdmin); - consumerBuilder.statsService(consumerStatsService); - return consumerBuilder.build(); + public TbQueueConsumer> createCalculatedFieldStateConsumer() { + return TbKafkaConsumerTemplate.>builder() + .settings(kafkaSettings) + .topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())) + .readFromBeginning(true) + .stopWhenRead(true) + .clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) + .groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer")) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateMsgProto.parseFrom(msg.getData()), msg.getHeaders())) + .admin(cfStateAdmin) + .statsService(consumerStatsService) + .build(); } @Override - public TbQueueProducer> createCalculatedFieldStateProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId()); - requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); - requestBuilder.admin(cfAdmin); - return requestBuilder.build(); + public TbQueueProducer> createCalculatedFieldStateProducer() { + return TbKafkaProducerTemplate.>builder() + .settings(kafkaSettings) + .clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())) + .admin(cfStateAdmin) + .build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 46b35f9acd..4fbf5c3e45 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -23,7 +23,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -87,6 +87,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; private final TbQueueAdmin cfAdmin; + private final TbQueueAdmin cfStateAdmin; private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -119,6 +120,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs()); + this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs()); } @Override @@ -338,26 +340,28 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createCalculatedFieldStateConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); - consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); - consumerBuilder.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())); - consumerBuilder.admin(cfAdmin); - consumerBuilder.statsService(consumerStatsService); - return consumerBuilder.build(); + public TbQueueConsumer> createCalculatedFieldStateConsumer() { + return TbKafkaConsumerTemplate.>builder() + .settings(kafkaSettings) + .topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())) + .readFromBeginning(true) + .stopWhenRead(true) + .clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) + .groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer")) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateMsgProto.parseFrom(msg.getData()), msg.getHeaders())) + .admin(cfStateAdmin) + .statsService(consumerStatsService) + .build(); } @Override - public TbQueueProducer> createCalculatedFieldStateProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId()); - requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); - requestBuilder.admin(cfAdmin); - return requestBuilder.build(); + public TbQueueProducer> createCalculatedFieldStateProducer() { + return TbKafkaProducerTemplate.>builder() + .settings(kafkaSettings) + .clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())) + .admin(cfStateAdmin) + .build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index d3c1d09399..5f0af72955 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -17,7 +17,7 @@ package org.thingsboard.server.queue.provider; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -126,8 +126,8 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer(); - TbQueueConsumer> createCalculatedFieldStateConsumer(); + TbQueueConsumer> createCalculatedFieldStateConsumer(); - TbQueueProducer> createCalculatedFieldStateProducer(); + TbQueueProducer> createCalculatedFieldStateProducer(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java index dea6d9ed9e..a24f0663b5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -40,7 +40,6 @@ public @interface AfterStartUp { int CF_READ_PROFILE_ENTITIES_SERVICE = 10; int CF_READ_CF_SERVICE = 11; - int CF_STATE_RESTORE_SERVICE = 12; int BEFORE_TRANSPORT_SERVICE = Integer.MAX_VALUE - 1001; int TRANSPORT_SERVICE = Integer.MAX_VALUE - 1000;