CF states restore from Kafka

This commit is contained in:
ViacheslavKlimov 2025-02-13 13:01:17 +02:00
parent 722c707591
commit 677385e8ba
35 changed files with 683 additions and 409 deletions

View File

@ -96,8 +96,13 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
public void process(CalculatedFieldStateRestoreMsg msg) {
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), msg.getId().cfId());
states.put(msg.getId().cfId(), msg.getState());
CalculatedFieldId cfId = msg.getId().cfId();
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
if (msg.getState() != null) {
states.put(cfId, msg.getState());
} else {
states.remove(cfId);
}
}
public void process(EntityInitCalculatedFieldMsg msg) {

View File

@ -0,0 +1,173 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicKvEntry;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService {
@Autowired
private ActorSystemContext actorSystemContext;
@Override
public final void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
CalculatedFieldStateMsgProto stateMsg = toProto(stateId, state);
long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes();
if (maxStateSizeInKBytes <= 0 || stateMsg.getSerializedSize() <= maxStateSizeInKBytes) {
doPersist(stateId, stateMsg, callback);
}
}
protected abstract void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback);
@Override
public final void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
doRemove(stateId, callback);
}
protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback);
protected void processRestoredState(CalculatedFieldStateMsgProto stateMsg) {
CalculatedFieldEntityCtxId stateId = fromProto(stateMsg.getId());
CalculatedFieldState state = stateMsg.hasState() ? fromProto(stateMsg.getState()) : null;
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(stateId, state));
}
protected CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
return CalculatedFieldEntityCtxIdProto.newBuilder()
.setTenantIdMSB(ctxId.tenantId().getId().getMostSignificantBits())
.setTenantIdLSB(ctxId.tenantId().getId().getLeastSignificantBits())
.setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
.setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits())
.setEntityType(ctxId.entityId().getEntityType().name())
.setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits())
.setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits())
.build();
}
protected CalculatedFieldEntityCtxId fromProto(CalculatedFieldEntityCtxIdProto ctxIdProto) {
TenantId tenantId = TenantId.fromUUID(new UUID(ctxIdProto.getTenantIdMSB(), ctxIdProto.getTenantIdLSB()));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId);
}
protected CalculatedFieldStateMsgProto toProto(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state) {
var stateProto = CalculatedFieldStateProto.newBuilder()
.setType(state.getType().name());
state.getArguments().forEach((argName, argEntry) -> {
if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
stateProto.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry));
} else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) {
stateProto.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry));
}
});
return CalculatedFieldStateMsgProto.newBuilder()
.setId(toProto(stateId))
.setState(stateProto)
.build();
}
protected TransportProtos.SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) {
TransportProtos.SingleValueArgumentProto.Builder builder = TransportProtos.SingleValueArgumentProto.newBuilder()
.setArgName(argName);
if (entry != SingleValueArgumentEntry.EMPTY) {
builder.setValue(KvProtoUtil.toTsValueProto(entry.getTs(), entry.getKvEntryValue()));
}
Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion);
return builder.build();
}
protected TransportProtos.TsValueListProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) {
TransportProtos.TsValueListProto.Builder builder = TransportProtos.TsValueListProto.newBuilder().setKey(argName);
if (entry != TsRollingArgumentEntry.EMPTY) {
entry.getTsRecords().forEach((ts, value) -> builder.addTsValue(KvProtoUtil.toTsValueProto(ts, value)));
}
return builder.build();
}
protected CalculatedFieldState fromProto(CalculatedFieldStateProto proto) {
if (StringUtils.isEmpty(proto.getType())) {
return null;
}
CalculatedFieldType type = CalculatedFieldType.valueOf(proto.getType());
CalculatedFieldState state = switch (type) {
case SIMPLE -> new SimpleCalculatedFieldState();
case SCRIPT -> new ScriptCalculatedFieldState();
};
proto.getSingleValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto)));
if (CalculatedFieldType.SCRIPT.equals(type)) {
proto.getRollingValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto)));
}
return state;
}
protected SingleValueArgumentEntry fromSingleValueArgumentProto(TransportProtos.SingleValueArgumentProto proto) {
if (!proto.hasValue()) {
return (SingleValueArgumentEntry) SingleValueArgumentEntry.EMPTY;
}
TransportProtos.TsValueProto tsValueProto = proto.getValue();
long ts = tsValueProto.getTs();
BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto);
return new SingleValueArgumentEntry(ts, kvEntry, proto.getVersion());
}
protected TsRollingArgumentEntry fromRollingArgumentProto(TransportProtos.TsValueListProto proto) {
if (proto.getTsValueCount() <= 0) {
return (TsRollingArgumentEntry) TsRollingArgumentEntry.EMPTY;
}
TreeMap<Long, BasicKvEntry> tsRecords = new TreeMap<>();
proto.getTsValueList().forEach(tsValueProto -> {
BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getKey(), tsValueProto);
tsRecords.put(tsValueProto.getTs(), kvEntry);
});
return new TsRollingArgumentEntry(tsRecords);
}
}

View File

@ -16,14 +16,19 @@
package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.Set;
public interface CalculatedFieldStateService {
void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback);
void restore(Set<TopicPartitionInfo> partitions);
}

View File

@ -13,40 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.utils;
package org.thingsboard.server.service.cf;
import jakarta.annotation.PreDestroy;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.utils.TbRocksDb;
@Component
public class RocksDBConfig {
public class CfRocksDb extends TbRocksDb {
@Value("${rocksdb.db_path:${java.io.tmpdir}/rocksdb}")
private String dbPath;
private RocksDB db;
static {
RocksDB.loadLibrary();
}
public RocksDB getDb() throws RocksDBException {
if (db == null) {
Options options = new Options().setCreateIfMissing(true);
db = RocksDB.open(options, dbPath);
}
return db;
public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) throws Exception {
super(path, new Options().setCreateIfMissing(true), new WriteOptions().setSync(true));
}
@PreDestroy
@Override
public void close() {
if (db != null) {
db.close();
db = null;
}
super.close();
}
}

View File

@ -70,6 +70,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
@ -91,7 +92,6 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
@TbRuleEngineComponent
@Service
@ -188,7 +188,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
if (broadcast) {
broadcasts.add(link);
} else {
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, link.entityId());
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId());
unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link);
}
}

View File

@ -1,97 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.utils.RocksDBConfig;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true)
public class RocksDBService {
private final RocksDB db;
private final WriteOptions writeOptions;
public RocksDBService(RocksDBConfig config) throws RocksDBException {
this.db = config.getDb();
this.writeOptions = new WriteOptions().setSync(true);
}
public void put(String key, String value) {
try {
db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
} catch (RocksDBException e) {
log.error("Failed to store data to RocksDB", e);
}
}
public void put(CalculatedFieldEntityCtxIdProto key, CalculatedFieldStateProto value) {
try {
db.put(writeOptions, key.toByteArray(), value.toByteArray());
} catch (RocksDBException e) {
log.error("Failed to store data to RocksDB", e);
}
}
public void delete(CalculatedFieldEntityCtxIdProto key) {
try {
db.delete(writeOptions, key.toByteArray());
} catch (RocksDBException e) {
log.error("Failed to delete data from RocksDB", e);
}
}
public String get(String key) {
try {
byte[] value = db.get(key.getBytes(StandardCharsets.UTF_8));
return value != null ? new String(value, StandardCharsets.UTF_8) : null;
} catch (RocksDBException e) {
log.error("Failed to retrieve data from RocksDB", e);
return null;
}
}
public Map<CalculatedFieldEntityCtxIdProto, CalculatedFieldStateProto> getAll() {
Map<CalculatedFieldEntityCtxIdProto, CalculatedFieldStateProto> results = new HashMap<>();
try (RocksIterator iterator = db.newIterator()) {
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
try {
CalculatedFieldEntityCtxIdProto key = CalculatedFieldEntityCtxIdProto.parseFrom(iterator.key());
CalculatedFieldStateProto value = CalculatedFieldStateProto.parseFrom(iterator.value());
results.put(key, value);
} catch (Exception e) {
log.error("Failed to retrieve data from RocksDB", e);
}
}
}
return results;
}
}

View File

@ -21,8 +21,8 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.HashPartitionService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
@ -30,7 +30,6 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@ -49,7 +48,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
myPartitions = event.getCalculatedFieldsPartitions().stream()
myPartitions = event.getCfPartitions().stream()
.filter(TopicPartitionInfo::isMyPartition)
.map(tpi -> tpi.getPartition().orElse(UNKNOWN)).collect(Collectors.toList());
//Naive approach that need to be improved.
@ -58,7 +57,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId);
var tpi = partitionService.resolve(QueueKey.CF, entityId);
var partition = tpi.getPartition().orElse(UNKNOWN);
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
.add(profileId, entityId, partition, tpi.isMyPartition());
@ -66,7 +65,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId);
var tpi = partitionService.resolve(QueueKey.CF, entityId);
var partition = tpi.getPartition().orElse(UNKNOWN);
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
//TODO: make this method atomic;
@ -87,7 +86,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) {
var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId);
var tpi = partitionService.resolve(QueueKey.CF, entityId);
return tpi.getPartition().orElse(UNKNOWN);
}

View File

@ -20,4 +20,9 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
public record CalculatedFieldEntityCtxId(TenantId tenantId, CalculatedFieldId cfId, EntityId entityId) {
public String toKey() {
return cfId + "_" + entityId;
}
}

View File

@ -15,31 +15,140 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.queue.DefaultTbCalculatedFieldConsumerService.CalculatedFieldQueueConfig;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@ConditionalOnExpression("'${zk.enabled:false}'=='true' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine')")
public class KafkaCalculatedFieldStateService implements CalculatedFieldStateService {
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldStateService {
@AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE)
public void initCalculatedFieldStates() {
private final TbRuleEngineQueueFactory queueFactory;
private final PartitionService partitionService;
@Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval;
@Value("${queue.calculated_fields.consumer_per_partition:true}")
private boolean consumerPerPartition;
private MainQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateMsgProto>, CalculatedFieldQueueConfig> stateConsumer;
private TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> stateProducer;
protected ExecutorService consumersExecutor;
protected ExecutorService mgmtExecutor;
protected ScheduledExecutorService scheduler;
private final AtomicInteger counter = new AtomicInteger();
@PostConstruct
private void init() {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cf-state-consumer"));
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(Math.max(Runtime.getRuntime().availableProcessors(), 4), "cf-state-mgmt");
this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("cf-state-consumer-scheduler");
this.stateConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateMsgProto>, CalculatedFieldQueueConfig>builder()
.queueKey(QueueKey.CF_STATES)
.config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval))
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<CalculatedFieldStateMsgProto> msg : msgs) {
try {
processRestoredState(msg.getValue());
} catch (Throwable t) {
log.error("Failed to process state message: {}", msg, t);
}
int processedMsgCount = counter.incrementAndGet();
if (processedMsgCount % 10000 == 0) {
log.info("Processed {} calculated field state msgs", processedMsgCount);
}
}
})
.consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateMsgProto>>) queueFactory.createCalculatedFieldStateProducer();
}
@Override
public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
callback.onSuccess();
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId());
stateProducer.send(tpi, stateId.toKey(), new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto), new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (callback != null) {
callback.onSuccess();
}
}
@Override
public void onFailure(Throwable t) {
if (callback != null) {
callback.onFailure(t);
}
}
});
}
@Override
public void removeState(CalculatedFieldEntityCtxId ctxId, TbCallback callback) {
callback.onSuccess();
protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
doPersist(stateId, CalculatedFieldStateMsgProto.newBuilder()
.setId(toProto(stateId))
.build(), callback);
}
@Override
public void restore(Set<TopicPartitionInfo> partitions) {
partitions = partitions.stream().map(tpi -> tpi.newByTopic(partitionService.getTopic(QueueKey.CF_STATES))).collect(Collectors.toSet());
log.info("Restoring calculated field states for partitions: {}", partitions.stream().map(TopicPartitionInfo::getFullTopicName).toList());
long startTs = System.currentTimeMillis();
counter.set(0);
stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update
stateConsumer.awaitStop(0);// consumers should stop on their own because stopWhenRead is true, we just need to wait
log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs);
}
@PreDestroy
private void preDestroy() {
stateConsumer.stop();
stateConsumer.awaitStop();
stateProducer.stop();
consumersExecutor.shutdownNow();
mgmtExecutor.shutdownNow();
scheduler.shutdownNow();
}
}

View File

@ -15,174 +15,57 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicKvEntry;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsValueListProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.service.cf.RocksDBService;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.CfRocksDb;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.Set;
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) // Queue type in mem or Kafka;
public class RocksDBCalculatedFieldStateService implements CalculatedFieldStateService {
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'")
public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldStateService {
private final ActorSystemContext actorSystemContext;
private final RocksDBService rocksDBService;
private final CfRocksDb cfRocksDb;
public Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates() {
return rocksDBService.getAll().entrySet().stream()
.collect(Collectors.toMap(
entry -> fromProto(entry.getKey()),
entry -> fromProto(entry.getValue())
));
}
private Set<TopicPartitionInfo> partitions;
@AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE)
public void initCalculatedFieldStates() {
restoreStates().forEach((k, v) -> actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(k, v)));
@Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateMsgProto stateMsgProto, TbCallback callback) {
cfRocksDb.put(stateId.toKey(), stateMsgProto.toByteArray());
callback.onSuccess();
}
@Override
public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
CalculatedFieldStateProto stateProto = toProto(stateId, state);
long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes();
if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= maxStateSizeInKBytes) {
rocksDBService.put(toProto(stateId), stateProto);
protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
cfRocksDb.delete(stateId.toKey());
callback.onSuccess();
}
@Override
public void restore(Set<TopicPartitionInfo> partitions) {
if (this.partitions == null) {
this.partitions = partitions;
} else {
return;
}
callback.onSuccess();
}
@Override
public void removeState(CalculatedFieldEntityCtxId ctxId, TbCallback callback) {
rocksDBService.delete(toProto(ctxId));
callback.onSuccess();
}
private CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
return CalculatedFieldEntityCtxIdProto.newBuilder()
.setTenantIdMSB(ctxId.tenantId().getId().getMostSignificantBits())
.setTenantIdLSB(ctxId.tenantId().getId().getLeastSignificantBits())
.setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
.setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits())
.setEntityType(ctxId.entityId().getEntityType().name())
.setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits())
.setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits())
.build();
}
private CalculatedFieldEntityCtxId fromProto(CalculatedFieldEntityCtxIdProto ctxIdProto) {
TenantId tenantId = TenantId.fromUUID(new UUID(ctxIdProto.getTenantIdMSB(), ctxIdProto.getTenantIdLSB()));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId);
}
private CalculatedFieldStateProto toProto(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state) {
CalculatedFieldStateProto.Builder builder = CalculatedFieldStateProto.newBuilder()
.setId(toProto(stateId))
.setType(state.getType().name());
state.getArguments().forEach((argName, argEntry) -> {
if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry));
} else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) {
builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry));
cfRocksDb.forEach((key, value) -> {
try {
processRestoredState(CalculatedFieldStateMsgProto.parseFrom(value));
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Failed to process restored state", key, e);
}
});
return builder.build();
}
private SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) {
SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder()
.setArgName(argName);
if (entry != SingleValueArgumentEntry.EMPTY) {
builder.setValue(KvProtoUtil.toTsValueProto(entry.getTs(), entry.getKvEntryValue()));
}
Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion);
return builder.build();
}
private TsValueListProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) {
TsValueListProto.Builder builder = TsValueListProto.newBuilder().setKey(argName);
if (entry != TsRollingArgumentEntry.EMPTY) {
entry.getTsRecords().forEach((ts, value) -> builder.addTsValue(KvProtoUtil.toTsValueProto(ts, value)));
}
return builder.build();
}
private CalculatedFieldState fromProto(CalculatedFieldStateProto proto) {
if (StringUtils.isEmpty(proto.getType())) {
return null;
}
CalculatedFieldType type = CalculatedFieldType.valueOf(proto.getType());
CalculatedFieldState state = switch (type) {
case SIMPLE -> new SimpleCalculatedFieldState();
case SCRIPT -> new ScriptCalculatedFieldState();
};
proto.getSingleValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto)));
if (CalculatedFieldType.SCRIPT.equals(type)) {
proto.getRollingValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto)));
}
return state;
}
private SingleValueArgumentEntry fromSingleValueArgumentProto(SingleValueArgumentProto proto) {
if (!proto.hasValue()) {
return (SingleValueArgumentEntry) SingleValueArgumentEntry.EMPTY;
}
TsValueProto tsValueProto = proto.getValue();
long ts = tsValueProto.getTs();
BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto);
return new SingleValueArgumentEntry(ts, kvEntry, proto.getVersion());
}
private TsRollingArgumentEntry fromRollingArgumentProto(TsValueListProto proto) {
if (proto.getTsValueCount() <= 0) {
return (TsRollingArgumentEntry) TsRollingArgumentEntry.EMPTY;
}
TreeMap<Long, BasicKvEntry> tsRecords = new TreeMap<>();
proto.getTsValueList().forEach(tsValueProto -> {
BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getKey(), tsValueProto);
tsRecords.put(tsValueProto.getTs(), kvEntry);
});
return new TsRollingArgumentEntry(tsRecords);
}
}

View File

@ -21,11 +21,11 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
@ -52,6 +52,7 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
@ -65,6 +66,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -86,10 +89,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private int poolSize;
private final TbRuleEngineQueueFactory queueFactory;
private final CalculatedFieldStateService stateService;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>, CalculatedFieldQueueConfig> mainConsumer;
private volatile ListeningExecutorService calculatedFieldsExecutor;
private ListeningExecutorService calculatedFieldsExecutor;
private ExecutorService repartitionExecutor;
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
ActorSystemContext actorContext,
@ -100,19 +105,22 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
PartitionService partitionService,
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache) {
CalculatedFieldCache calculatedFieldCache,
CalculatedFieldStateService stateService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory;
this.stateService = stateService;
}
@PostConstruct
public void init() {
super.init("tb-cf");
this.calculatedFieldsExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(poolSize, "tb-cf-executor")); // TODO: multiple threads.
this.repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-cf-repartition"));
this.mainConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>, CalculatedFieldQueueConfig>builder()
.queueKey(new QueueKey(ServiceType.TB_RULE_ENGINE))
.queueKey(QueueKey.CF)
.config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval))
.msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
@ -137,20 +145,23 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
var partitions = event.getCalculatedFieldsPartitions();
log.info("Subscribing to partitions: {}", partitions);
// TODO: @vklimov - before update of the main consumer, we should read the state topics and use
// CalculatedFieldStateService (KafkaCalculatedFieldStateService) to restore the states for entities that belong to new partitions.
// Cleanup entities that do not belong to current partition;
mainConsumer.update(event.getCalculatedFieldsPartitions());
// Cleanup old entities after corresponding consumers are stopped.
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
var partitions = event.getCfPartitions();
repartitionExecutor.submit(() -> {
try {
stateService.restore(partitions);
mainConsumer.update(partitions);
// Cleanup old entities after corresponding consumers are stopped.
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
} catch (Throwable t) {
log.error("Failed to process partition change event: {}", event, t);
}
});
}
private boolean[] partitionsToBooleanIndexArray(Set<TopicPartitionInfo> partitions) {
boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()];
for(var tpi : partitions) {
for (var tpi : partitions) {
tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true);
}
return myPartitions;
@ -193,9 +204,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
packSubmitFuture.cancel(true);
log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
}
// if (log.isDebugEnabled()) {
// ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
// }
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); // TODO: replace with commented above after testing
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();

View File

@ -94,6 +94,7 @@ import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.TbRuleEngineProducerService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
@ -111,7 +112,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.util.ProtoUtils.toProto;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
@Service
@Slf4j
@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId);
pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback);
}
@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId);
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
toRuleEngineNfs.incrementAndGet();
}
@ -792,8 +792,7 @@ public class DefaultTbClusterService implements TbClusterService {
private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) {
log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId);
switch (action) {
case ASSIGNED_TO_EDGE ->
pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null);
case ASSIGNED_TO_EDGE -> pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null);
case UNASSIGNED_FROM_EDGE -> {
EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId);
pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null);

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -63,8 +64,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
@Service
@TbRuleEngineComponent
@Slf4j
@ -109,7 +108,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
event.getPartitionsMap().forEach((queueKey, partitions) -> {
if (CALCULATED_FIELD_QUEUE_KEY.equals(queueKey)) {
if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) {
return;
}
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {

View File

@ -182,7 +182,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
}
private void doUpdate(Set<TopicPartitionInfo> partitions) {
public void doUpdate(Set<TopicPartitionInfo> partitions) {
this.partitions = partitions;
consumerWrapper.updatePartitions(partitions);
}
@ -226,7 +226,9 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
protected void processMsgs(List<M> msgs, TbQueueConsumer<M> consumer, C config) throws Exception {
log.trace("Processing {} messages", msgs.size());
msgPackProcessor.process(msgs, consumer, config);
log.trace("Processed {} messages", msgs.size());
}
public void stop() {
@ -236,8 +238,12 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
public void awaitStop() {
awaitStop(30);
}
public void awaitStop(long timeoutSec) {
log.debug("[{}] Waiting for consumers to stop", queueKey);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
consumerWrapper.getConsumers().forEach(consumerTask -> consumerTask.awaitCompletion(timeoutSec));
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
}

View File

@ -70,13 +70,21 @@ public class TbQueueConsumerTask<M extends TbQueueMsg> {
}
public void awaitCompletion() {
awaitCompletion(30);
}
public void awaitCompletion(long timeoutSec) {
log.trace("[{}] Awaiting finish", key);
if (isRunning()) {
try {
task.get(30, TimeUnit.SECONDS);
if (timeoutSec > 0) {
task.get(timeoutSec, TimeUnit.SECONDS);
} else {
task.get();
}
log.trace("[{}] Awaited finish", key);
} catch (Exception e) {
log.warn("[{}] Failed to await for consumer to stop", key, e);
log.warn("[{}] Failed to await for consumer to stop (timeout {} sec)", key, timeoutSec, e);
}
task = null;
}

View File

@ -106,6 +106,7 @@ public class RestAuthenticationProvider implements AuthenticationProvider {
if (twoFactorAuthService.isTwoFaEnabled(securityUser.getTenantId(), securityUser.getId())) {
return new MfaAuthenticationToken(securityUser);
} else {
systemSecurityService.logLoginAction(securityUser, authentication.getDetails(), ActionType.LOGIN, null);
}
} else {

View File

@ -0,0 +1,71 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.utils;
import lombok.SneakyThrows;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.BiConsumer;
public class TbRocksDb {
protected final String path;
private final WriteOptions writeOptions;
protected final RocksDB db;
static {
RocksDB.loadLibrary();
}
public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) throws Exception {
this.path = path;
this.writeOptions = writeOptions;
Files.createDirectories(Path.of(path).getParent());
this.db = RocksDB.open(dbOptions, path);
}
@SneakyThrows
public void put(String key, byte[] value) {
db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value);
}
public void forEach(BiConsumer<String, byte[]> processor) {
try (RocksIterator iterator = db.newIterator()) {
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
String key = new String(iterator.key(), StandardCharsets.UTF_8);
processor.accept(key, iterator.value());
}
}
}
@SneakyThrows
public void delete(String key) {
db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8));
}
public void close() {
if (db != null) {
db.close();
}
}
}

View File

@ -426,10 +426,6 @@ sql:
pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls
query_timeout: "${SQL_RELATIONS_QUERY_TIMEOUT_SEC:20}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls
rocksdb:
# Rocksdb path
db_path: "${ROCKS_DB_PATH:${java.io.tmpdir}/rocksdb}"
# Actor system parameters
actors:
system:
@ -1629,6 +1625,8 @@ queue:
edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Calculated Field topics
calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Calculated Field State topics
calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
@ -1763,6 +1761,8 @@ queue:
consumer_per_partition: "${TB_QUEUE_CF_CONSUMER_PER_PARTITION:true}"
# Thread pool size for processing of the incoming messages
pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}"
# RocksDB path for storing CF states
rocks_db_path: "${TB_QUEUE_CF_ROCKS_DB_PATH:${user.home}/.rocksdb/cf_states}"
transport:
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

View File

@ -146,5 +146,6 @@ public class DataConstants {
public static final String EDGE_EVENT_QUEUE_NAME = "EdgeEvent";
public static final String CF_QUEUE_NAME = "CalculatedFields";
public static final String CF_STATES_QUEUE_NAME = "CalculatedFieldStates";
}

View File

@ -75,4 +75,16 @@ public class CollectionsUtil {
return isEmpty(collection) || collection.contains(element);
}
public static <V> boolean isOneOf(V value, V... others) {
if (value == null) {
return false;
}
for (V other : others) {
if (value.equals(other)) {
return true;
}
}
return false;
}
}

View File

@ -814,12 +814,15 @@ message SingleValueArgumentProto {
int64 version = 3;
}
message CalculatedFieldStateProto {
message CalculatedFieldStateMsgProto {
CalculatedFieldEntityCtxIdProto id = 1;
// int32 version = 2;
string type = 3;
repeated SingleValueArgumentProto singleValueArguments = 4;
repeated TsValueListProto rollingValueArguments = 5;
CalculatedFieldStateProto state = 2;
}
message CalculatedFieldStateProto {
string type = 1;
repeated SingleValueArgumentProto singleValueArguments = 2;
repeated TsValueListProto rollingValueArguments = 3;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.

View File

@ -120,9 +120,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
if (record != null) {
result.add(decode(record));
}
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
throw new RuntimeException("Failed to decode record: ", e);
} catch (Exception e) {
log.error("Failed to decode record {}", record, e);
throw new RuntimeException("Failed to decode record " + record, e);
}
});
return result;

View File

@ -51,7 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.*;
import static org.thingsboard.server.common.data.DataConstants.EDGE_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
@Service
@Slf4j
@ -78,8 +79,6 @@ public class HashPartitionService implements PartitionService {
@Value("${queue.partitions.hash_function_name:murmur3_128}")
private String hashFunctionName;
public static final QueueKey CALCULATED_FIELD_QUEUE_KEY = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME);
private final ApplicationEventPublisher applicationEventPublisher;
private final TbServiceInfoProvider serviceInfoProvider;
private final TenantRoutingInfoService tenantRoutingInfoService;
@ -120,8 +119,10 @@ public class HashPartitionService implements PartitionService {
partitionSizesMap.put(coreKey, corePartitions);
partitionTopicsMap.put(coreKey, coreTopic);
partitionSizesMap.put(CALCULATED_FIELD_QUEUE_KEY, cfPartitions);
partitionTopicsMap.put(CALCULATED_FIELD_QUEUE_KEY, cfEventTopic);
partitionSizesMap.put(QueueKey.CF, cfPartitions);
partitionTopicsMap.put(QueueKey.CF, cfEventTopic);
partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions);
partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic);
QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
partitionSizesMap.put(vcKey, vcPartitions);
@ -148,6 +149,11 @@ public class HashPartitionService implements PartitionService {
return myPartitions.get(queueKey);
}
@Override
public String getTopic(QueueKey queueKey) {
return partitionTopicsMap.get(queueKey);
}
private void doInitRuleEnginePartitions() {
List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos();
queueRoutingInfoList.forEach(queue -> {
@ -222,7 +228,7 @@ public class HashPartitionService implements PartitionService {
});
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream()
.collect(Collectors.toMap(k -> k, k -> Collections.emptySet())));
.collect(Collectors.toMap(k -> k, k -> Collections.emptySet())), Collections.emptyMap());
}
}
@ -402,6 +408,7 @@ public class HashPartitionService implements PartitionService {
myPartitions = newPartitions;
Map<QueueKey, Set<TopicPartitionInfo>> changedPartitionsMap = new HashMap<>();
Map<QueueKey, Set<TopicPartitionInfo>> oldPartitionsMap = new HashMap<>();
Set<QueueKey> removed = new HashSet<>();
oldPartitions.forEach((queueKey, partitions) -> {
@ -422,19 +429,16 @@ public class HashPartitionService implements PartitionService {
myPartitions.forEach((queueKey, partitions) -> {
if (!partitions.equals(oldPartitions.get(queueKey))) {
Set<TopicPartitionInfo> tpiList = partitions.stream()
.map(partition -> buildTopicPartitionInfo(queueKey, partition))
.collect(Collectors.toSet());
changedPartitionsMap.put(queueKey, tpiList);
changedPartitionsMap.put(queueKey, toTpiList(queueKey, partitions));
oldPartitionsMap.put(queueKey, toTpiList(queueKey, oldPartitions.get(queueKey)));
}
});
if (!changedPartitionsMap.isEmpty()) {
Map<ServiceType, Map<QueueKey, Set<TopicPartitionInfo>>> partitionsByServiceType = new HashMap<>();
changedPartitionsMap.forEach((queueKey, partitions) -> {
partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
.put(queueKey, partitions);
});
partitionsByServiceType.forEach(this::publishPartitionChangeEvent);
changedPartitionsMap.entrySet().stream()
.collect(Collectors.groupingBy(entry -> entry.getKey().getType(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.forEach((serviceType, partitionsMap) -> {
publishPartitionChangeEvent(serviceType, partitionsMap, oldPartitionsMap);
});
}
if (currentOtherServices == null) {
@ -466,13 +470,15 @@ public class HashPartitionService implements PartitionService {
applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService));
}
private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
log.info("Partitions changed: {}", System.lineSeparator() + partitionsMap.entrySet().stream()
private void publishPartitionChangeEvent(ServiceType serviceType,
Map<QueueKey, Set<TopicPartitionInfo>> newPartitions,
Map<QueueKey, Set<TopicPartitionInfo>> oldPartitions) {
log.info("Partitions changed: {}", System.lineSeparator() + newPartitions.entrySet().stream()
.map(entry -> "[" + entry.getKey() + "] - [" + entry.getValue().stream()
.map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted()
.collect(Collectors.joining(", ")) + "]")
.collect(Collectors.joining(System.lineSeparator())));
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap);
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, newPartitions);
try {
applicationEventPublisher.publishEvent(event);
} catch (Exception e) {
@ -480,6 +486,15 @@ public class HashPartitionService implements PartitionService {
}
}
private Set<TopicPartitionInfo> toTpiList(QueueKey queueKey, List<Integer> partitions) {
if (partitions == null) {
return Collections.emptySet();
}
return partitions.stream()
.map(partition -> buildTopicPartitionInfo(queueKey, partition))
.collect(Collectors.toSet());
}
@Override
public Set<String> getAllServiceIds(ServiceType serviceType) {
return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet());
@ -508,7 +523,6 @@ public class HashPartitionService implements PartitionService {
return result;
}
@Override
public int resolvePartitionIndex(UUID entityId, int partitions) {
int hash = hash(entityId);

View File

@ -45,6 +45,8 @@ public interface PartitionService {
List<Integer> getMyPartitions(QueueKey queueKey);
String getTopic(QueueKey queueKey);
/**
* Received from the Discovery service when network topology is changed.
* @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo}

View File

@ -23,6 +23,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME;
@Data
@AllArgsConstructor
public class QueueKey {
@ -32,6 +35,9 @@ public class QueueKey {
private final String queueName;
private final TenantId tenantId;
public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME);
public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME);
public QueueKey(ServiceType type, Queue queue) {
this.type = type;
this.queueName = queue.getName();

View File

@ -28,8 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
@ToString(callSuper = true)
public class PartitionChangeEvent extends TbApplicationEvent {
@ -55,8 +53,8 @@ public class PartitionChangeEvent extends TbApplicationEvent {
return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_CORE, DataConstants.EDGE_QUEUE_NAME);
}
public Set<TopicPartitionInfo> getCalculatedFieldsPartitions() {
return partitionsMap.getOrDefault(CALCULATED_FIELD_QUEUE_KEY, Collections.emptySet());
public Set<TopicPartitionInfo> getCfPartitions() {
return partitionsMap.getOrDefault(QueueKey.CF, Collections.emptySet());
}
private Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) {

View File

@ -23,12 +23,19 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
import java.util.UUID;
public class KafkaTbQueueMsg implements TbQueueMsg {
private static final int UUID_LENGTH = 36;
private final UUID key;
private final TbQueueMsgHeaders headers;
private final byte[] data;
public KafkaTbQueueMsg(ConsumerRecord<String, byte[]> record) {
this.key = UUID.fromString(record.key());
if (record.key().length() == UUID_LENGTH) {
this.key = UUID.fromString(record.key());
} else {
this.key = UUID.randomUUID();
}
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
record.headers().forEach(header -> {
headers.put(header.key(), header.value());

View File

@ -18,9 +18,11 @@ package org.thingsboard.server.queue.kafka;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.util.StopWatch;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
@ -29,9 +31,12 @@ import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 24.09.18.
@ -46,10 +51,15 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private final TbKafkaConsumerStatsService statsService;
private final String groupId;
private final boolean readFromBeginning; // reset offset to beginning
private final boolean stopWhenRead; // stop consuming when reached initial end offsets
private Map<Integer, Long> endOffsets; // needed if stopWhenRead is true
@Builder
private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
String clientId, String groupId, String topic,
TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) {
TbQueueAdmin admin, TbKafkaConsumerStatsService statsService,
boolean readFromBeginning, boolean stopWhenRead) {
super(topic);
Properties props = settings.toConsumerProps(topic);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
@ -67,13 +77,35 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
this.admin = admin;
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.readFromBeginning = readFromBeginning;
this.stopWhenRead = stopWhenRead;
}
@Override
protected void doSubscribe(List<String> topicNames) {
if (!topicNames.isEmpty()) {
topicNames.forEach(admin::createTopicIfNotExists);
consumer.subscribe(topicNames);
if (readFromBeginning || stopWhenRead) {
consumer.subscribe(topicNames, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.debug("Handling onPartitionsAssigned {}", partitions);
if (readFromBeginning) {
consumer.seekToBeginning(partitions);
}
if (stopWhenRead) {
endOffsets = consumer.endOffsets(partitions).entrySet().stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue));
}
}
});
} else {
consumer.subscribe(topicNames);
}
} else {
log.info("unsubscribe due to empty topic list");
consumer.unsubscribe();
@ -92,13 +124,32 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
stopWatch.stop();
log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis());
List<ConsumerRecord<String, byte[]>> recordList;
if (records.isEmpty()) {
return Collections.emptyList();
recordList = Collections.emptyList();
} else {
List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
records.forEach(recordList::add);
return recordList;
recordList = new ArrayList<>(256);
records.forEach(record -> {
recordList.add(record);
if (stopWhenRead && endOffsets != null) {
int partition = record.partition();
Long endOffset = endOffsets.get(partition);
if (endOffset == null) {
log.warn("End offset not found for {} [{}]", record.topic(), partition);
return;
}
log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1);
if (record.offset() >= endOffset - 1) {
endOffsets.remove(partition);
}
}
});
}
if (stopWhenRead && endOffsets != null && endOffsets.isEmpty()) {
log.info("Reached end offset for {}, stopping consumer", consumer.assignment());
stop();
}
return recordList;
}
@Override

View File

@ -97,9 +97,12 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
send(tpi, msg.getKey().toString(), msg, callback);
}
public void send(TopicPartitionInfo tpi, String key, T msg, TbQueueCallback callback) {
try {
createTopicIfNotExist(tpi);
String key = msg.getKey().toString();
byte[] data = msg.getData();
ProducerRecord<String, byte[]> record;
List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
@ -116,7 +119,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
log.warn("Producer template failure", exception);
}
}
});

View File

@ -54,6 +54,8 @@ public class TbKafkaTopicConfigs {
private String housekeeperReprocessingProperties;
@Value("${queue.kafka.topic-properties.calculated-field:}")
private String calculatedFieldProperties;
@Value("${queue.kafka.topic-properties.calculated-field-state:}")
private String calculatedFieldStateProperties;
@Getter
private Map<String, String> coreConfigs;
@ -83,6 +85,8 @@ public class TbKafkaTopicConfigs {
private Map<String, String> edgeEventConfigs;
@Getter
private Map<String, String> calculatedFieldConfigs;
@Getter
private Map<String, String> calculatedFieldStateConfigs;
@PostConstruct
private void init() {
@ -102,6 +106,7 @@ public class TbKafkaTopicConfigs {
edgeConfigs = PropertyUtils.getProps(edgeProperties);
edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties);
calculatedFieldConfigs = PropertyUtils.getProps(calculatedFieldProperties);
calculatedFieldStateConfigs = PropertyUtils.getProps(calculatedFieldStateProperties);
}
}

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
@ -159,12 +160,12 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateConsumer() {
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
}

View File

@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -100,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
private final TbQueueAdmin cfAdmin;
private final TbQueueAdmin cfStateAdmin;
private final AtomicLong consumerCount = new AtomicLong();
@ -142,6 +143,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs());
}
@Override
@ -546,26 +548,28 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
consumerBuilder.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<CalculatedFieldStateMsgProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()))
.readFromBeginning(true)
.stopWhenRead(true)
.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet())
.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateMsgProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(cfStateAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<CalculatedFieldStateMsgProto>>builder()
.settings(kafkaSettings)
.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()))
.admin(cfStateAdmin)
.build();
}
@PreDestroy

View File

@ -23,7 +23,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -87,6 +87,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
private final TbQueueAdmin cfAdmin;
private final TbQueueAdmin cfStateAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
@ -119,6 +120,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs());
}
@Override
@ -338,26 +340,28 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
consumerBuilder.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<CalculatedFieldStateMsgProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()))
.readFromBeginning(true)
.stopWhenRead(true)
.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet())
.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateMsgProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(cfStateAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<CalculatedFieldStateMsgProto>>builder()
.settings(kafkaSettings)
.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()))
.admin(cfStateAdmin)
.build();
}
@PreDestroy

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.queue.provider;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -126,8 +126,8 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer();
TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer();
TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateConsumer();
TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateProducer();
TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateMsgProto>> createCalculatedFieldStateProducer();
}

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -40,7 +40,6 @@ public @interface AfterStartUp {
int CF_READ_PROFILE_ENTITIES_SERVICE = 10;
int CF_READ_CF_SERVICE = 11;
int CF_STATE_RESTORE_SERVICE = 12;
int BEFORE_TRANSPORT_SERVICE = Integer.MAX_VALUE - 1001;
int TRANSPORT_SERVICE = Integer.MAX_VALUE - 1000;