implemented logic to fetch telemetry if states were not fetched

This commit is contained in:
IrynaMatveieva 2024-12-20 16:16:58 +02:00
parent 5c5dc474cb
commit 2a41c8b451
3 changed files with 121 additions and 79 deletions

View File

@ -81,12 +81,13 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry;
import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -97,6 +98,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -200,11 +202,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
var future = calculatedFieldExecutor.submit(() -> { var future = calculatedFieldExecutor.submit(() -> {
try { try {
for (CalculatedField cf : partition) { for (CalculatedField cf : partition) {
if (EntityType.ASSET_PROFILE.equals(cf.getEntityId().getEntityType()) || EntityType.DEVICE_PROFILE.equals(cf.getEntityId().getEntityType())) { EntityId cfEntityId = cf.getEntityId();
getOrFetchFromDBProfileEntities(cf.getTenantId(), cf.getEntityId()) if (isProfileEntity(cfEntityId)) {
getOrFetchFromDBProfileEntities(cf.getTenantId(), cfEntityId)
.forEach(entityId -> restoreState(cf, entityId)); .forEach(entityId -> restoreState(cf, entityId));
} else { } else {
restoreState(cf, cf.getEntityId()); restoreState(cf, cfEntityId);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -272,9 +275,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} }
case ASSET_PROFILE, DEVICE_PROFILE -> { case ASSET_PROFILE, DEVICE_PROFILE -> {
log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId); log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId);
fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> { Map<String, Argument> commonArguments = calculatedFieldCtx.getArguments().entrySet().stream()
.filter(entry -> !isProfileEntity(entry.getValue().getEntityId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
fetchArguments(tenantId, entityId, commonArguments, commonArgs -> {
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> { getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> {
initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArguments, callback); initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArgs, callback);
}); });
}); });
} }
@ -473,7 +479,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
if (calculatedFieldState != null) { if (calculatedFieldState != null) {
msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState)); msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState));
} }
clusterService.pushMsgToCore(tenantId, entityId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null);
} }
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
@ -556,35 +562,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
} }
private void fetchCommonArguments(CalculatedFieldCtx calculatedFieldCtx, TbCallback callback, Consumer<Map<String, ArgumentEntry>> onComplete) {
Map<String, ArgumentEntry> argumentValues = new HashMap<>();
List<ListenableFuture<ArgumentEntry>> futures = new ArrayList<>();
calculatedFieldCtx.getArguments().forEach((key, argument) -> {
if (!EntityType.DEVICE_PROFILE.equals(argument.getEntityId().getEntityType()) &&
!EntityType.ASSET_PROFILE.equals(argument.getEntityId().getEntityType())) {
futures.add(Futures.transform(fetchKvEntry(calculatedFieldCtx.getTenantId(), argument.getEntityId(), argument),
result -> {
argumentValues.put(key, result);
return result;
}, calculatedFieldCallbackExecutor));
}
});
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(List<ArgumentEntry> results) {
onComplete.accept(argumentValues);
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to fetch common arguments", t);
callback.onFailure(t);
}
}, calculatedFieldCallbackExecutor);
}
private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) {
initializeStateForEntity(calculatedFieldCtx, entityId, new HashMap<>(), callback); initializeStateForEntity(calculatedFieldCtx, entityId, new HashMap<>(), callback);
} }
@ -595,7 +572,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldCtx.getArguments().forEach((key, argument) -> { calculatedFieldCtx.getArguments().forEach((key, argument) -> {
if (!commonArguments.containsKey(key)) { if (!commonArguments.containsKey(key)) {
futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx, entityId, argument), futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx.getTenantId(), entityId, argument),
result -> { result -> {
argumentValues.put(key, result); argumentValues.put(key, result);
return result; return result;
@ -618,10 +595,25 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}, calculatedFieldCallbackExecutor); }, calculatedFieldCallbackExecutor);
} }
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) { private ListenableFuture<Void> fetchArguments(TenantId tenantId, EntityId entityId, Map<String, Argument> necessaryArguments, Consumer<Map<String, ArgumentEntry>> onComplete) {
TenantId tenantId = calculatedFieldCtx.getTenantId(); Map<String, ArgumentEntry> argumentValues = new HashMap<>();
List<ListenableFuture<ArgumentEntry>> futures = new ArrayList<>();
necessaryArguments.forEach((key, argument) -> {
futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument),
result -> {
argumentValues.put(key, result);
return result;
}, calculatedFieldCallbackExecutor));
});
return Futures.transform(Futures.allAsList(futures), results -> {
onComplete.accept(argumentValues);
return null;
}, calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) {
EntityId argumentEntityId = argument.getEntityId(); EntityId argumentEntityId = argument.getEntityId();
EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) EntityId entityId = isProfileEntity(argumentEntityId)
? targetEntityId ? targetEntityId
: argumentEntityId; : argumentEntityId;
return fetchKvEntry(tenantId, entityId, argument); return fetchKvEntry(tenantId, entityId, argument);
@ -654,11 +646,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? ArgumentEntry.createTsRollingArgument(Collections.emptyList()) : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor);
} }
private ListenableFuture<ArgumentEntry> transformSingleValueArgument(ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture) { private ListenableFuture<ArgumentEntry> transformSingleValueArgument(ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture) {
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldCallbackExecutor); return Futures.transform(kvEntryFuture, kvEntry -> {
if (kvEntry.isPresent() && kvEntry.get().getValue() != null) {
return ArgumentEntry.createSingleValueArgument(kvEntry.get());
} else {
return SingleValueArgumentEntry.EMPTY;
}
}, calculatedFieldCallbackExecutor);
} }
private KvEntry createDefaultKvEntry(Argument argument) { private KvEntry createDefaultKvEntry(Argument argument) {
@ -676,48 +674,67 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues) { private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues) {
CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, this::fetchCalculatedFieldEntityState); CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()));
Predicate<Map<String, ArgumentEntry>> allArgsPresent = (args) ->
args.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
!args.containsValue(SingleValueArgumentEntry.EMPTY) && !args.containsValue(TsRollingArgumentEntry.EMPTY);
Consumer<CalculatedFieldState> performUpdateState = (state) -> {
if (state.updateState(argumentValues)) {
calculatedFieldEntityCtx.setState(state);
TenantId tenantId = calculatedFieldCtx.getTenantId();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
if (tpi.isMyPartition()) {
states.put(entityCtxId, calculatedFieldEntityCtx);
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
} else {
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state);
}
if (allArgsPresent.test(state.getArguments())) {
performCalculation(calculatedFieldCtx, state, entityId);
}
}
};
CalculatedFieldState state = calculatedFieldEntityCtx.getState(); CalculatedFieldState state = calculatedFieldEntityCtx.getState();
boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
if (!allKeysPresent) {
if (state == null) { Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream()
state = createStateByType(calculatedFieldCtx.getCfType()); .filter(entry -> !argumentValues.containsKey(entry.getKey()))
} .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (state.updateState(argumentValues)) {
calculatedFieldEntityCtx.setState(state); fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll)
TenantId tenantId = calculatedFieldCtx.getTenantId(); .addListener(() -> performUpdateState.accept(state),
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldCtx.getCfId()); calculatedFieldCallbackExecutor);
if (tpi.isMyPartition()) { return;
states.put(entityCtxId, calculatedFieldEntityCtx);
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
} else {
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state);
}
boolean allArgsPresent = calculatedFieldCtx.getArguments().keySet().containsAll(state.getArguments().keySet());
if (allArgsPresent) {
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedFieldCtx);
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
if (result != null) {
pushMsgToRuleEngine(tenantId, entityId, result);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t);
}
}, MoreExecutors.directExecutor());
}
} }
performUpdateState.accept(state);
} }
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId) { private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId) {
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedFieldCtx);
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
if (result != null) {
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t);
}
}, MoreExecutors.directExecutor());
}
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) {
String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId));
if (stateStr == null) { if (stateStr == null) {
return new CalculatedFieldEntityCtx(entityCtxId, null); return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType));
} }
return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class);
} }
@ -725,8 +742,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) { private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) {
try { try {
String type = calculatedFieldResult.getType(); String type = calculatedFieldResult.getType();
TbMsgType msgType = "ATTRIBUTE".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
TbMsgMetaData md = "ATTRIBUTE".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
ObjectNode payload = createJsonPayload(calculatedFieldResult); ObjectNode payload = createJsonPayload(calculatedFieldResult);
TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload)); TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload));
clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null);
@ -749,4 +766,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}; };
} }
private boolean isProfileEntity(EntityId entityId) {
return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType());
}
} }

View File

@ -27,6 +27,8 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
@AllArgsConstructor @AllArgsConstructor
public class SingleValueArgumentEntry implements ArgumentEntry { public class SingleValueArgumentEntry implements ArgumentEntry {
public static final ArgumentEntry EMPTY = new SingleValueArgumentEntry(0);
private long ts; private long ts;
private Object value; private Object value;
@ -39,6 +41,14 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
this.value = entry.getValue(); this.value = entry.getValue();
} }
/**
* Internal constructor to create immutable SingleValueArgumentEntry.EMPTY
* */
private SingleValueArgumentEntry(int ignored) {
this.ts = System.currentTimeMillis();
this.value = null;
}
@Override @Override
public ArgumentType getType() { public ArgumentType getType() {
return ArgumentType.SINGLE_VALUE; return ArgumentType.SINGLE_VALUE;

View File

@ -29,6 +29,8 @@ import java.util.TreeMap;
@Slf4j @Slf4j
public class TsRollingArgumentEntry implements ArgumentEntry { public class TsRollingArgumentEntry implements ArgumentEntry {
public static final ArgumentEntry EMPTY = new TsRollingArgumentEntry(0);
private static final int MAX_ROLLING_ARGUMENT_ENTRY_SIZE = 1000; private static final int MAX_ROLLING_ARGUMENT_ENTRY_SIZE = 1000;
private TreeMap<Long, Object> tsRecords = new TreeMap<>(); private TreeMap<Long, Object> tsRecords = new TreeMap<>();
@ -37,6 +39,13 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
addAllTsRecords(tsRecords); addAllTsRecords(tsRecords);
} }
/**
* Internal constructor to create immutable TsRollingArgumentEntry.EMPTY
*/
private TsRollingArgumentEntry(int ignored) {
this.tsRecords = new TreeMap<>();
}
@Override @Override
public ArgumentType getType() { public ArgumentType getType() {
return ArgumentType.TS_ROLLING; return ArgumentType.TS_ROLLING;
@ -50,7 +59,9 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
@Override @Override
public boolean hasUpdatedValue(ArgumentEntry entry) { public boolean hasUpdatedValue(ArgumentEntry entry) {
return !tsRecords.containsKey(((SingleValueArgumentEntry) entry).getTs()); return entry instanceof SingleValueArgumentEntry ?
!tsRecords.containsKey(((SingleValueArgumentEntry) entry).getTs()) :
!tsRecords.keySet().containsAll(((TsRollingArgumentEntry) entry).getTsRecords().keySet());
} }
@Override @Override