added implementation for ts deletion
This commit is contained in:
parent
49328d270b
commit
dd0b73cfb3
@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
|
|||||||
import org.thingsboard.server.actors.TbActorCtx;
|
import org.thingsboard.server.actors.TbActorCtx;
|
||||||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
|
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
|
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
|
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
|
||||||
@ -58,6 +59,7 @@ import java.util.Map;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -166,14 +168,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
if (cfIds.contains(ctx.getCfId())) {
|
if (cfIds.contains(ctx.getCfId())) {
|
||||||
callback.onSuccess(CALLBACKS_PER_CF);
|
callback.onSuccess(CALLBACKS_PER_CF);
|
||||||
} else {
|
} else {
|
||||||
if (proto.getRemovedTsKeysCount() > 0) {
|
if (proto.getTsDataCount() > 0) {
|
||||||
processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
|
||||||
} else if (proto.getRemovedAttrKeysCount() > 0) {
|
|
||||||
processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
|
||||||
} else if (proto.getTsDataCount() > 0) {
|
|
||||||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
} else if (proto.getAttrDataCount() > 0) {
|
} else if (proto.getAttrDataCount() > 0) {
|
||||||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
|
}
|
||||||
|
if (proto.getRemovedTsKeysCount() > 0) {
|
||||||
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
|
} else if (proto.getRemovedAttrKeysCount() > 0) {
|
||||||
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
} else {
|
} else {
|
||||||
callback.onSuccess(CALLBACKS_PER_CF);
|
callback.onSuccess(CALLBACKS_PER_CF);
|
||||||
}
|
}
|
||||||
@ -188,14 +191,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
if (cfIds.contains(ctx.getCfId())) {
|
if (cfIds.contains(ctx.getCfId())) {
|
||||||
callback.onSuccess(CALLBACKS_PER_CF);
|
callback.onSuccess(CALLBACKS_PER_CF);
|
||||||
} else {
|
} else {
|
||||||
|
if (proto.getTsDataCount() > 0) {
|
||||||
|
processTelemetry(ctx, proto, cfIdList, callback);
|
||||||
|
} else if (proto.getAttrDataCount() > 0) {
|
||||||
|
processAttributes(ctx, proto, cfIdList, callback);
|
||||||
|
}
|
||||||
if (proto.getRemovedTsKeysCount() > 0) {
|
if (proto.getRemovedTsKeysCount() > 0) {
|
||||||
processRemovedTelemetry(ctx, proto, cfIdList, callback);
|
processRemovedTelemetry(ctx, proto, cfIdList, callback);
|
||||||
} else if (proto.getRemovedAttrKeysCount() > 0) {
|
} else if (proto.getRemovedAttrKeysCount() > 0) {
|
||||||
processRemovedAttributes(ctx, proto, cfIdList, callback);
|
processRemovedAttributes(ctx, proto, cfIdList, callback);
|
||||||
} else if (proto.getTsDataCount() > 0) {
|
|
||||||
processTelemetry(ctx, proto, cfIdList, callback);
|
|
||||||
} else if (proto.getAttrDataCount() > 0) {
|
|
||||||
processAttributes(ctx, proto, cfIdList, callback);
|
|
||||||
} else {
|
} else {
|
||||||
callback.onSuccess(CALLBACKS_PER_CF);
|
callback.onSuccess(CALLBACKS_PER_CF);
|
||||||
}
|
}
|
||||||
@ -208,26 +212,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
|
||||||
// reinit cf (consider fetching only removed ts)
|
|
||||||
log.info("Force reinitialization of CF: [{}].", ctx.getCfId());
|
|
||||||
states.remove(ctx.getCfId());
|
|
||||||
try {
|
|
||||||
var state = getOrInitState(ctx);
|
|
||||||
if (state.isSizeOk()) {
|
|
||||||
processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, callback);
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
|
||||||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapDeletedAttributesToDefault(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
||||||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
}
|
}
|
||||||
@ -236,6 +220,14 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
||||||
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
|
||||||
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
|
||||||
|
}
|
||||||
|
|
||||||
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
|
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
|
||||||
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
|
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
|
||||||
if (newArgValues.isEmpty()) {
|
if (newArgValues.isEmpty()) {
|
||||||
@ -334,7 +326,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
return mapToArguments(argNames, data);
|
return mapToArguments(argNames, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) {
|
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) {
|
||||||
if (argNames.isEmpty()) {
|
if (argNames.isEmpty()) {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
@ -366,7 +358,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
return mapToArguments(argNames, scope, attrDataList);
|
return mapToArguments(argNames, scope, attrDataList);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
|
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
|
||||||
Map<String, ArgumentEntry> arguments = new HashMap<>();
|
Map<String, ArgumentEntry> arguments = new HashMap<>();
|
||||||
for (AttributeValueProto item : attrDataList) {
|
for (AttributeValueProto item : attrDataList) {
|
||||||
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
|
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
|
||||||
@ -378,19 +370,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
return arguments;
|
return arguments;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, ArgumentEntry> mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
||||||
var argNames = ctx.getLinkedEntityArguments().get(entityId);
|
var argNames = ctx.getLinkedEntityArguments().get(entityId);
|
||||||
if (argNames.isEmpty()) {
|
if (argNames.isEmpty()) {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
return mapToArgumentsDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys);
|
return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, ArgumentEntry> mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
||||||
return mapToArgumentsDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys);
|
return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, ArgumentEntry> mapToArgumentsDefaultValue(Map<ReferencedEntityKey, String> argNames, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(Map<ReferencedEntityKey, String> argNames, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) {
|
||||||
Map<String, ArgumentEntry> arguments = new HashMap<>();
|
Map<String, ArgumentEntry> arguments = new HashMap<>();
|
||||||
for (String removedKey : removedAttrKeys) {
|
for (String removedKey : removedAttrKeys) {
|
||||||
ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
|
ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
|
||||||
@ -398,7 +390,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
if (argName != null) {
|
if (argName != null) {
|
||||||
Argument argument = configArguments.get(argName);
|
Argument argument = configArguments.get(argName);
|
||||||
String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
|
String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
|
||||||
arguments.put(argName, (defaultValue != null)
|
arguments.put(argName, StringUtils.isNotEmpty(defaultValue)
|
||||||
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
|
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
|
||||||
: new SingleValueArgumentEntry());
|
: new SingleValueArgumentEntry());
|
||||||
|
|
||||||
@ -407,6 +399,17 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
return arguments;
|
return arguments;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, ArgumentEntry> mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List<String> removedTelemetryKeys) {
|
||||||
|
Map<String, Argument> deletedArguments = ctx.getArguments().entrySet().stream()
|
||||||
|
.filter(entry -> removedTelemetryKeys.contains(entry.getKey()))
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
|
||||||
|
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments);
|
||||||
|
|
||||||
|
fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true));
|
||||||
|
return fetchedArgs;
|
||||||
|
}
|
||||||
|
|
||||||
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) {
|
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) {
|
||||||
List<CalculatedFieldId> cfIds = new LinkedList<>();
|
List<CalculatedFieldId> cfIds = new LinkedList<>();
|
||||||
for (var cfId : proto.getPreviousCalculatedFieldsList()) {
|
for (var cfId : proto.getPreviousCalculatedFieldsList()) {
|
||||||
|
|||||||
@ -17,20 +17,25 @@ package org.thingsboard.server.service.cf;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
||||||
|
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||||
|
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
|
||||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public interface CalculatedFieldProcessingService {
|
public interface CalculatedFieldProcessingService {
|
||||||
|
|
||||||
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
|
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
|
||||||
|
|
||||||
|
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);
|
||||||
|
|
||||||
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback);
|
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback);
|
||||||
|
|
||||||
void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List<CalculatedFieldEntityCtxId> linkedCalculatedFields, TbCallback callback);
|
void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List<CalculatedFieldEntityCtxId> linkedCalculatedFields, TbCallback callback);
|
||||||
|
|||||||
@ -147,6 +147,28 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
|||||||
}, calculatedFieldCallbackExecutor);
|
}, calculatedFieldCallbackExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments) {
|
||||||
|
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
|
||||||
|
for (var entry : arguments.entrySet()) {
|
||||||
|
var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId;
|
||||||
|
var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue());
|
||||||
|
argFutures.put(entry.getKey(), argValueFuture);
|
||||||
|
}
|
||||||
|
return argFutures.entrySet().stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
Entry::getKey, // Keep the key as is
|
||||||
|
entry -> {
|
||||||
|
try {
|
||||||
|
// Resolve the future to get the value
|
||||||
|
return entry.getValue().get();
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) {
|
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -54,4 +54,8 @@ public interface ArgumentEntry {
|
|||||||
|
|
||||||
TbelCfArg toTbelCfArg();
|
TbelCfArg toTbelCfArg();
|
||||||
|
|
||||||
|
boolean isForceResetPrevious();
|
||||||
|
|
||||||
|
void setForceResetPrevious(boolean forceResetPrevious);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -55,7 +55,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
|||||||
ArgumentEntry newEntry = entry.getValue();
|
ArgumentEntry newEntry = entry.getValue();
|
||||||
ArgumentEntry existingEntry = arguments.get(key);
|
ArgumentEntry existingEntry = arguments.get(key);
|
||||||
|
|
||||||
if (existingEntry == null) {
|
if (existingEntry == null || newEntry.isForceResetPrevious()) {
|
||||||
validateNewEntry(newEntry);
|
validateNewEntry(newEntry);
|
||||||
arguments.put(key, newEntry);
|
arguments.put(key, newEntry);
|
||||||
stateUpdated = true;
|
stateUpdated = true;
|
||||||
|
|||||||
@ -151,28 +151,10 @@ public class CalculatedFieldCtx {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean matchesKeys(List<String> keys, AttributeScope scope) {
|
|
||||||
return matchesAttributesKeys(mainEntityArguments, keys, scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean matchesKeys(List<String> keys) {
|
|
||||||
return matchesTimeSeriesKeys(mainEntityArguments, keys);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean matches(List<AttributeKvEntry> values, AttributeScope scope) {
|
public boolean matches(List<AttributeKvEntry> values, AttributeScope scope) {
|
||||||
return matchesAttributes(mainEntityArguments, values, scope);
|
return matchesAttributes(mainEntityArguments, values, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean linkMatchesAttrKeys(EntityId entityId, List<String> keys, AttributeScope scope) {
|
|
||||||
var map = linkedEntityArguments.get(entityId);
|
|
||||||
return map != null && matchesAttributesKeys(map, keys, scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean linkMatchesTsKeys(EntityId entityId, List<String> keys) {
|
|
||||||
var map = linkedEntityArguments.get(entityId);
|
|
||||||
return map != null && matchesTimeSeriesKeys(map, keys);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean linkMatches(EntityId entityId, List<AttributeKvEntry> values, AttributeScope scope) {
|
public boolean linkMatches(EntityId entityId, List<AttributeKvEntry> values, AttributeScope scope) {
|
||||||
var map = linkedEntityArguments.get(entityId);
|
var map = linkedEntityArguments.get(entityId);
|
||||||
return map != null && matchesAttributes(map, values, scope);
|
return map != null && matchesAttributes(map, values, scope);
|
||||||
@ -187,7 +169,7 @@ public class CalculatedFieldCtx {
|
|||||||
return map != null && matchesTimeSeries(map, values);
|
return map != null && matchesTimeSeries(map, values);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
|
private boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
|
||||||
for (AttributeKvEntry attrKv : values) {
|
for (AttributeKvEntry attrKv : values) {
|
||||||
ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope);
|
ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope);
|
||||||
if (argMap.containsKey(attrKey)) {
|
if (argMap.containsKey(attrKey)) {
|
||||||
@ -197,16 +179,6 @@ public class CalculatedFieldCtx {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
|
|
||||||
for (String key : keys) {
|
|
||||||
ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
|
|
||||||
if (argMap.containsKey(attrKey)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean matchesTimeSeries(Map<ReferencedEntityKey, String> argMap, List<TsKvEntry> values) {
|
private boolean matchesTimeSeries(Map<ReferencedEntityKey, String> argMap, List<TsKvEntry> values) {
|
||||||
for (TsKvEntry tsKv : values) {
|
for (TsKvEntry tsKv : values) {
|
||||||
ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null);
|
ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null);
|
||||||
@ -221,6 +193,24 @@ public class CalculatedFieldCtx {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean matchesKeys(List<String> keys, AttributeScope scope) {
|
||||||
|
return matchesAttributesKeys(mainEntityArguments, keys, scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean matchesKeys(List<String> keys) {
|
||||||
|
return matchesTimeSeriesKeys(mainEntityArguments, keys);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
|
||||||
|
for (String key : keys) {
|
||||||
|
ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
|
||||||
|
if (argMap.containsKey(attrKey)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
|
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
|
ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
|
||||||
@ -235,22 +225,32 @@ public class CalculatedFieldCtx {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean linkMatchesAttrKeys(EntityId entityId, List<String> keys, AttributeScope scope) {
|
||||||
|
var map = linkedEntityArguments.get(entityId);
|
||||||
|
return map != null && matchesAttributesKeys(map, keys, scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean linkMatchesTsKeys(EntityId entityId, List<String> keys) {
|
||||||
|
var map = linkedEntityArguments.get(entityId);
|
||||||
|
return map != null && matchesTimeSeriesKeys(map, keys);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) {
|
public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) {
|
||||||
if (!proto.getRemovedTsKeysList().isEmpty()) {
|
if (!proto.getTsDataList().isEmpty()) {
|
||||||
return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList());
|
|
||||||
} else if (!proto.getRemovedAttrKeysList().isEmpty()) {
|
|
||||||
return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name()));
|
|
||||||
} else if (!proto.getTsDataList().isEmpty()) {
|
|
||||||
List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream()
|
List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream()
|
||||||
.map(ProtoUtils::fromProto)
|
.map(ProtoUtils::fromProto)
|
||||||
.toList();
|
.toList();
|
||||||
return linkMatches(entityId, updatedTelemetry);
|
return linkMatches(entityId, updatedTelemetry);
|
||||||
} else {
|
} else if (!proto.getAttrDataList().isEmpty()) {
|
||||||
AttributeScope scope = AttributeScope.valueOf(proto.getScope().name());
|
AttributeScope scope = AttributeScope.valueOf(proto.getScope().name());
|
||||||
List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream()
|
List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream()
|
||||||
.map(ProtoUtils::fromProto)
|
.map(ProtoUtils::fromProto)
|
||||||
.toList();
|
.toList();
|
||||||
return linkMatches(entityId, updatedTelemetry, scope);
|
return linkMatches(entityId, updatedTelemetry, scope);
|
||||||
|
} else if (!proto.getRemovedTsKeysList().isEmpty()) {
|
||||||
|
return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList());
|
||||||
|
} else {
|
||||||
|
return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -38,6 +38,8 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
|||||||
private BasicKvEntry kvEntryValue;
|
private BasicKvEntry kvEntryValue;
|
||||||
private Long version;
|
private Long version;
|
||||||
|
|
||||||
|
private boolean forceResetPrevious;
|
||||||
|
|
||||||
public SingleValueArgumentEntry(TsKvProto entry) {
|
public SingleValueArgumentEntry(TsKvProto entry) {
|
||||||
this.ts = entry.getTs();
|
this.ts = entry.getTs();
|
||||||
this.version = entry.getVersion();
|
this.version = entry.getVersion();
|
||||||
@ -61,6 +63,12 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
|||||||
this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry);
|
this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SingleValueArgumentEntry(long ts, BasicKvEntry kvEntryValue, Long version) {
|
||||||
|
this.ts = ts;
|
||||||
|
this.kvEntryValue = kvEntryValue;
|
||||||
|
this.version = version;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ArgumentEntryType getType() {
|
public ArgumentEntryType getType() {
|
||||||
return ArgumentEntryType.SINGLE_VALUE;
|
return ArgumentEntryType.SINGLE_VALUE;
|
||||||
|
|||||||
@ -42,6 +42,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
|
|||||||
private Long timeWindow;
|
private Long timeWindow;
|
||||||
private TreeMap<Long, Double> tsRecords = new TreeMap<>();
|
private TreeMap<Long, Double> tsRecords = new TreeMap<>();
|
||||||
|
|
||||||
|
private boolean forceResetPrevious;
|
||||||
|
|
||||||
public TsRollingArgumentEntry(List<TsKvEntry> kvEntries, int limit, long timeWindow) {
|
public TsRollingArgumentEntry(List<TsKvEntry> kvEntries, int limit, long timeWindow) {
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
this.timeWindow = timeWindow;
|
this.timeWindow = timeWindow;
|
||||||
@ -60,6 +62,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
|
|||||||
this.timeWindow = timeWindow;
|
this.timeWindow = timeWindow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TsRollingArgumentEntry(Integer limit, Long timeWindow, TreeMap<Long, Double> tsRecords) {
|
||||||
|
this.limit = limit;
|
||||||
|
this.timeWindow = timeWindow;
|
||||||
|
this.tsRecords = tsRecords;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ArgumentEntryType getType() {
|
public ArgumentEntryType getType() {
|
||||||
return ArgumentEntryType.TS_ROLLING;
|
return ArgumentEntryType.TS_ROLLING;
|
||||||
|
|||||||
@ -211,10 +211,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
DonAsynchron.withCallback(deleteFuture, result -> {
|
DonAsynchron.withCallback(deleteFuture, result -> {
|
||||||
calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys()));
|
calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys()));
|
||||||
}, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor);
|
}, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor);
|
||||||
// addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure);
|
|
||||||
} else {
|
} else {
|
||||||
ListenableFuture<List<String>> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId());
|
ListenableFuture<List<String>> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId());
|
||||||
addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure);
|
DonAsynchron.withCallback(deleteFuture, result -> {
|
||||||
|
calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), result));
|
||||||
|
}, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user