onAddedPartitions() impl

This commit is contained in:
IrynaMatveieva 2024-12-18 13:12:39 +02:00
parent 481753b8f0
commit f929de4209

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -63,6 +64,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.asset.AssetService;
@ -173,13 +175,66 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
// TODO: implementation for cluster mode
return Map.of();
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
Map<TopicPartitionInfo, List<CalculatedField>> tpiCalculatedFieldMap = new HashMap<>();
for (CalculatedField cf : cfs) {
TopicPartitionInfo tpi;
try {
tpi = partitionService.resolve(ServiceType.TB_CORE, cf.getTenantId(), cf.getId());
} catch (Exception e) {
log.warn("Failed to resolve partition for CalculatedField [{}], tenant [{}]. Reason: {}",
cf.getId(), cf.getTenantId(), e.getMessage());
continue;
}
if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId().getId()))) {
tpiCalculatedFieldMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(cf);
}
}
for (var entry : tpiCalculatedFieldMap.entrySet()) {
for (List<CalculatedField> partition : Lists.partition(entry.getValue(), 1000)) {
log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size());
var future = calculatedFieldExecutor.submit(() -> {
try {
for (CalculatedField cf : partition) {
if (EntityType.ASSET_PROFILE.equals(cf.getEntityId().getEntityType()) || EntityType.DEVICE_PROFILE.equals(cf.getEntityId().getEntityType())) {
getOrFetchFromDBProfileEntities(cf.getTenantId(), cf.getEntityId())
.forEach(entityId -> restoreState(cf, entityId));
} else {
restoreState(cf, cf.getEntityId());
}
}
} catch (Throwable t) {
log.error("Unexpected exception while restoring CalculatedField states", t);
throw t;
}
});
result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future);
}
}
return result;
}
private void restoreState(CalculatedField cf, EntityId entityId) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId());
String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId));
if (storedState != null) {
CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class);
calculatedFieldsCtx.putIfAbsent(cf.getId(), new CalculatedFieldCtx(cf, tbelInvokeService));
states.put(ctxId, restoredCtx);
log.info("Restored state for CalculatedField [{}]", cf.getId());
} else {
log.warn("No state found for CalculatedField [{}], entity [{}].", cf.getId(), entityId);
}
}
@Override
protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) {
// TODO: implementation for cluster mode
calculatedFields.remove(entityId);
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(entityId.getId()));
}
@Override
@ -213,8 +268,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
case ASSET_PROFILE, DEVICE_PROFILE -> {
log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId);
fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> {
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> {
initializeStateForEntity(calculatedFieldCtx, assetId, commonArguments, callback);
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> {
initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArguments, callback);
});
});
}