Merge pull request #13510 from irynamatveieva/performance
Calculated fields performance improvements
This commit is contained in:
commit
6314cb7ee5
@ -81,6 +81,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
import org.thingsboard.server.dao.job.JobService;
|
||||
import org.thingsboard.server.dao.mobile.MobileAppBundleService;
|
||||
import org.thingsboard.server.dao.mobile.MobileAppService;
|
||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
|
||||
@ -97,7 +98,6 @@ import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.resource.ResourceService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.rule.RuleNodeStateService;
|
||||
import org.thingsboard.server.dao.job.JobService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
@ -320,7 +320,6 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private TbEntityViewService tbEntityViewService;
|
||||
|
||||
@Lazy
|
||||
@Autowired
|
||||
@Getter
|
||||
private TelemetrySubscriptionService tsSubService;
|
||||
@ -453,12 +452,10 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private RateLimitService rateLimitService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private DebugModeRateLimitsConfig debugModeRateLimitsConfig;
|
||||
@ -539,17 +536,14 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private EntityService entityService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private CalculatedFieldProcessingService calculatedFieldProcessingService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private CalculatedFieldStateService calculatedFieldStateService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private CalculatedFieldQueueService calculatedFieldQueueService;
|
||||
|
||||
@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.DebugModeUtil;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.TbActorCtx;
|
||||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
|
||||
@ -299,7 +298,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
callback.onSuccess();
|
||||
}
|
||||
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
|
||||
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
|
||||
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, calculationResult.getResult().toString(), null);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.service.cf;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
@ -39,6 +40,7 @@ import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto;
|
||||
public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService {
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private ActorSystemContext actorSystemContext;
|
||||
|
||||
protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService;
|
||||
|
||||
@ -19,7 +19,9 @@ import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
@ -49,14 +51,13 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
|
||||
private static final Integer UNKNOWN_PARTITION = -1;
|
||||
|
||||
private final Lock calculatedFieldFetchLock = new ReentrantLock();
|
||||
private final ConcurrentReferenceHashMap<CalculatedFieldId, Lock> calculatedFieldFetchLocks = new ConcurrentReferenceHashMap<>();
|
||||
|
||||
private final CalculatedFieldService calculatedFieldService;
|
||||
private final TbelInvokeService tbelInvokeService;
|
||||
private final ActorSystemContext actorSystemContext;
|
||||
private final ApiLimitService apiLimitService;
|
||||
@Lazy
|
||||
private final ActorSystemContext actorSystemContext;
|
||||
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EntityId, List<CalculatedField>> entityIdCalculatedFields = new ConcurrentHashMap<>();
|
||||
@ -99,19 +100,20 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
|
||||
@Override
|
||||
public List<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId) {
|
||||
return entityIdCalculatedFields.getOrDefault(entityId, new CopyOnWriteArrayList<>());
|
||||
return entityIdCalculatedFields.getOrDefault(entityId, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId) {
|
||||
return entityIdCalculatedFieldLinks.getOrDefault(entityId, new CopyOnWriteArrayList<>());
|
||||
return entityIdCalculatedFieldLinks.getOrDefault(entityId, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId) {
|
||||
CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||
if (ctx == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
Lock lock = getFetchLock(calculatedFieldId);
|
||||
lock.lock();
|
||||
try {
|
||||
ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||
if (ctx == null) {
|
||||
@ -123,7 +125,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx);
|
||||
@ -142,7 +144,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
|
||||
@Override
|
||||
public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
Lock lock = getFetchLock(calculatedFieldId);
|
||||
lock.lock();
|
||||
try {
|
||||
CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId);
|
||||
if (calculatedField == null) {
|
||||
@ -164,7 +167,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
.add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId));
|
||||
});
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -188,4 +191,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
log.debug("[{}] evict calculated field links from cached links by entity id: {}", calculatedFieldId, oldCalculatedField);
|
||||
}
|
||||
|
||||
private Lock getFetchLock(CalculatedFieldId id) {
|
||||
return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -25,7 +25,6 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
||||
import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
|
||||
@ -66,7 +65,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
|
||||
@ -171,7 +169,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
||||
OutputType type = calculatedFieldResult.getType();
|
||||
TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
|
||||
TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
|
||||
TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(calculatedFieldResult.getResult())).build();
|
||||
TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(calculatedFieldResult.getResult().toString()).build();
|
||||
clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
|
||||
@ -87,7 +87,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
var entityId = request.getEntityId();
|
||||
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()),
|
||||
var entries = request.getEntries();
|
||||
checkEntityAndPushToQueue(tenantId, entityId,
|
||||
cf -> cf.matches(entries),
|
||||
cf -> cf.linkMatches(entityId, entries),
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@ -100,7 +103,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
var entityId = request.getEntityId();
|
||||
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
|
||||
var entries = request.getEntries();
|
||||
var scope = request.getScope();
|
||||
checkEntityAndPushToQueue(tenantId, entityId,
|
||||
cf -> cf.matches(entries, scope),
|
||||
cf -> cf.linkMatches(entityId, entries, scope),
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@ -113,7 +120,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
public void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
var entityId = request.getEntityId();
|
||||
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()),
|
||||
var scope = request.getScope();
|
||||
checkEntityAndPushToQueue(tenantId, entityId,
|
||||
cf -> cf.matchesKeys(result, scope),
|
||||
cf -> cf.linkMatchesAttrKeys(entityId, result, scope),
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@ -121,8 +131,9 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
public void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
var entityId = request.getEntityId();
|
||||
|
||||
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result),
|
||||
checkEntityAndPushToQueue(tenantId, entityId,
|
||||
cf -> cf.matchesKeys(result),
|
||||
cf -> cf.linkMatchesTsKeys(entityId, result),
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@ -134,7 +145,8 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
}
|
||||
boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter);
|
||||
if (send) {
|
||||
clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback));
|
||||
ToCalculatedFieldMsg calculatedFieldMsg = msg.get();
|
||||
clusterService.pushMsgToCalculatedFields(tenantId, entityId, calculatedFieldMsg, wrap(callback));
|
||||
} else {
|
||||
if (callback != null) {
|
||||
callback.onSuccess(null);
|
||||
@ -143,20 +155,35 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
}
|
||||
|
||||
private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate<CalculatedFieldCtx> filter, Predicate<CalculatedFieldCtx> linkedEntityFilter) {
|
||||
boolean send = false;
|
||||
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
|
||||
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter);
|
||||
if (!send) {
|
||||
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter);
|
||||
}
|
||||
if (!send) {
|
||||
send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream()
|
||||
.map(CalculatedFieldLink::getCalculatedFieldId)
|
||||
.map(calculatedFieldCache::getCalculatedFieldCtx)
|
||||
.anyMatch(linkedEntityFilter);
|
||||
if (!supportedReferencedEntities.contains(entityId.getEntityType())) {
|
||||
return false;
|
||||
}
|
||||
List<CalculatedFieldCtx> entityCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId);
|
||||
for (CalculatedFieldCtx ctx : entityCfs) {
|
||||
if (filter.test(ctx)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return send;
|
||||
|
||||
EntityId profileId = getProfileId(tenantId, entityId);
|
||||
if (profileId != null) {
|
||||
List<CalculatedFieldCtx> profileCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(profileId);
|
||||
for (CalculatedFieldCtx ctx : profileCfs) {
|
||||
if (filter.test(ctx)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<CalculatedFieldLink> links = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId);
|
||||
for (CalculatedFieldLink link : links) {
|
||||
CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(link.getCalculatedFieldId());
|
||||
if (ctx != null && linkedEntityFilter.test(ctx)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
|
||||
@ -168,8 +195,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
}
|
||||
|
||||
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
|
||||
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
|
||||
|
||||
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
|
||||
|
||||
List<TsKvEntry> entries = request.getEntries();
|
||||
@ -183,8 +208,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
telemetryMsg.addTsData(tsProtoBuilder.build());
|
||||
}
|
||||
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
return msg.build();
|
||||
return ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(telemetryMsg).build();
|
||||
}
|
||||
|
||||
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) {
|
||||
@ -198,7 +222,9 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder();
|
||||
attrProtoBuilder.setVersion(versions.get(i));
|
||||
if (versions != null && !versions.isEmpty() && versions.get(i) != null) {
|
||||
attrProtoBuilder.setVersion(versions.get(i));
|
||||
}
|
||||
telemetryMsg.addAttrData(attrProtoBuilder.build());
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
|
||||
@ -181,26 +181,37 @@ public class CalculatedFieldCtx {
|
||||
}
|
||||
|
||||
private boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
|
||||
if (argMap.isEmpty() || values.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (AttributeKvEntry attrKv : values) {
|
||||
ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope);
|
||||
if (argMap.containsKey(attrKey)) {
|
||||
if (argMap.containsKey(new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean matchesTimeSeries(Map<ReferencedEntityKey, String> argMap, List<TsKvEntry> values) {
|
||||
if (argMap.isEmpty() || values.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (TsKvEntry tsKv : values) {
|
||||
|
||||
ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null);
|
||||
if (argMap.containsKey(latestKey)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null);
|
||||
if (argMap.containsKey(rollingKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -213,26 +224,38 @@ public class CalculatedFieldCtx {
|
||||
}
|
||||
|
||||
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
|
||||
if (argMap.isEmpty() || keys.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String key : keys) {
|
||||
ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
|
||||
if (argMap.containsKey(attrKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
|
||||
if (argMap.isEmpty() || keys.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String key : keys) {
|
||||
|
||||
ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
|
||||
if (argMap.containsKey(latestKey)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null);
|
||||
if (argMap.containsKey(rollingKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -114,18 +114,14 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
if (callback != null) {
|
||||
callback.onSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (callback != null) {
|
||||
callback.onFailure(t);
|
||||
}
|
||||
log.error("Failed to send state message: {}", stateId, t);
|
||||
}
|
||||
});
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -58,7 +58,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
||||
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
|
||||
try {
|
||||
BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue();
|
||||
expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString()));
|
||||
double value = switch (kvEntry.getDataType()) {
|
||||
case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow();
|
||||
case DOUBLE -> kvEntry.getDoubleValue().orElseThrow();
|
||||
case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow();
|
||||
case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString());
|
||||
};
|
||||
expr.setVariable(entry.getKey(), value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.");
|
||||
}
|
||||
@ -85,7 +91,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
||||
|
||||
private JsonNode createResultJson(boolean useLatestTs, String outputName, Object result) {
|
||||
ObjectNode valuesNode = JacksonUtil.newObjectNode();
|
||||
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
|
||||
if (result instanceof Double doubleValue) {
|
||||
valuesNode.put(outputName, doubleValue);
|
||||
} else if (result instanceof Integer integerValue) {
|
||||
valuesNode.put(outputName, integerValue);
|
||||
} else {
|
||||
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
|
||||
}
|
||||
|
||||
long latestTs = getLatestTimestamp();
|
||||
if (useLatestTs && latestTs != -1) {
|
||||
|
||||
@ -130,11 +130,9 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
private final AtomicInteger toEdgeNfs = new AtomicInteger(0);
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private PartitionService partitionService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private TbQueueProducerProvider producerProvider;
|
||||
|
||||
@Autowired
|
||||
|
||||
@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.DonAsynchron;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
|
||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.DeviceStateManager;
|
||||
@ -69,7 +69,6 @@ import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static java.util.Comparator.comparing;
|
||||
@ -96,6 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
|
||||
@Value("${sql.ts.value_no_xss_validation:false}")
|
||||
private boolean valueNoXssValidation;
|
||||
@Value("${sql.ts.callback_thread_pool_size:12}")
|
||||
private int callbackThreadPoolSize;
|
||||
|
||||
public DefaultTelemetrySubscriptionService(AttributesService attrService,
|
||||
TimeseriesService tsService,
|
||||
@ -116,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
super.initExecutor();
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback"));
|
||||
tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, "ts-service-ts-callback");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -346,6 +346,7 @@ sql:
|
||||
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic
|
||||
batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution
|
||||
value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability
|
||||
callback_thread_pool_size: "${SQL_TS_CALLBACK_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor
|
||||
ts_latest:
|
||||
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates
|
||||
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds
|
||||
|
||||
@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
@ -143,7 +144,7 @@ public class SimpleCalculatedFieldStateTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPerformCalculationWhenPassedNotNumber() {
|
||||
void testPerformCalculationWhenPassedString() {
|
||||
state.arguments = new HashMap<>(Map.of(
|
||||
"key1", key1ArgEntry,
|
||||
"key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new StringDataEntry("key2", "string"), 124L),
|
||||
@ -155,6 +156,23 @@ public class SimpleCalculatedFieldStateTest {
|
||||
.hasMessage("Argument 'key2' is not a number.");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPerformCalculationWhenPassedBoolean() throws ExecutionException, InterruptedException {
|
||||
state.arguments = new HashMap<>(Map.of(
|
||||
"key1", key1ArgEntry,
|
||||
"key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new BooleanDataEntry("key2", true), 124L),// true is parsed as 1
|
||||
"key3", key3ArgEntry
|
||||
));
|
||||
|
||||
CalculatedFieldResult result = state.performCalculation(ctx).get();
|
||||
|
||||
assertThat(result).isNotNull();
|
||||
Output output = getCalculatedFieldConfig().getOutput();
|
||||
assertThat(result.getType()).isEqualTo(output.getType());
|
||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||
assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("output", 35)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPerformCalculationWhenDecimalsByDefault() throws ExecutionException, InterruptedException {
|
||||
state.arguments = new HashMap<>(Map.of(
|
||||
|
||||
@ -16,10 +16,8 @@
|
||||
package org.thingsboard.server.dao.sql.attributes;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.AbstractVersionedInsertRepository;
|
||||
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
@ -27,8 +25,6 @@ import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
@Transactional
|
||||
@SqlDao
|
||||
public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository<AttributeKvEntity> {
|
||||
|
||||
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " +
|
||||
|
||||
@ -29,7 +29,6 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
|
||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao {
|
||||
|
||||
|
||||
@ -18,11 +18,9 @@ package org.thingsboard.server.dao.sqlts.insert.latest.sql;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.AbstractVersionedInsertRepository;
|
||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
||||
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
@ -32,8 +30,6 @@ import java.util.List;
|
||||
|
||||
@SqlTsLatestAnyDao
|
||||
@Repository
|
||||
@Transactional
|
||||
@SqlDao
|
||||
public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository<TsKvLatestEntity> implements InsertLatestTsRepository {
|
||||
|
||||
@Value("${sql.ts_latest.update_by_latest_ts:true}")
|
||||
|
||||
@ -33,9 +33,8 @@ public class KvUtils {
|
||||
|
||||
static {
|
||||
validatedKeys = Caffeine.newBuilder()
|
||||
.weakKeys()
|
||||
.expireAfterAccess(24, TimeUnit.HOURS)
|
||||
.maximumSize(100000).build();
|
||||
.maximumSize(50000).build();
|
||||
}
|
||||
|
||||
public static void validate(List<? extends KvEntry> tsKvEntries, boolean valueNoXssValidation) {
|
||||
@ -57,11 +56,13 @@ public class KvUtils {
|
||||
throw new DataValidationException("Validation error: key length must be equal or less than 255");
|
||||
}
|
||||
|
||||
if (validatedKeys.getIfPresent(key) == null) {
|
||||
if (!NoXssValidator.isValid(key)) {
|
||||
throw new DataValidationException("Validation error: key is malformed");
|
||||
}
|
||||
validatedKeys.put(key, Boolean.TRUE);
|
||||
Boolean isValid = validatedKeys.asMap().get(key);
|
||||
if (isValid == null) {
|
||||
isValid = NoXssValidator.isValid(key);
|
||||
validatedKeys.put(key, isValid);
|
||||
}
|
||||
if (!isValid) {
|
||||
throw new DataValidationException("Validation error: key is malformed");
|
||||
}
|
||||
|
||||
if (valueNoXssValidation) {
|
||||
|
||||
@ -57,6 +57,7 @@ public class AttributesSaveRequest implements CalculatedFieldSystemAwareRequest
|
||||
public static final Strategy PROCESS_ALL = new Strategy(true, true, true);
|
||||
public static final Strategy WS_ONLY = new Strategy(false, true, false);
|
||||
public static final Strategy SKIP_ALL = new Strategy(false, false, false);
|
||||
public static final Strategy CF_ONLY = new Strategy(false, false, true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -56,6 +56,7 @@ public class TimeseriesSaveRequest implements CalculatedFieldSystemAwareRequest
|
||||
public static final Strategy WS_ONLY = new Strategy(false, false, true, false);
|
||||
public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false);
|
||||
public static final Strategy SKIP_ALL = new Strategy(false, false, false, false);
|
||||
public static final Strategy CF_ONLY = new Strategy(false, false, false, true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -92,13 +92,14 @@ public class TbCalculatedFieldsNode implements TbNode {
|
||||
.customerId(msg.getCustomerId())
|
||||
.entityId(msg.getOriginator())
|
||||
.entries(tsKvEntryList)
|
||||
.strategy(TimeseriesSaveRequest.Strategy.CF_ONLY)
|
||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||
.tbMsgId(msg.getId())
|
||||
.tbMsgType(msg.getInternalType())
|
||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||
.build();
|
||||
|
||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback());
|
||||
ctx.getTelemetryService().saveTimeseries(timeseriesSaveRequest);
|
||||
}
|
||||
|
||||
private void processPostAttributesRequest(TbContext ctx, TbMsg msg) {
|
||||
@ -114,12 +115,13 @@ public class TbCalculatedFieldsNode implements TbNode {
|
||||
.entityId(msg.getOriginator())
|
||||
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
|
||||
.entries(newAttributes)
|
||||
.strategy(AttributesSaveRequest.Strategy.CF_ONLY)
|
||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||
.tbMsgId(msg.getId())
|
||||
.tbMsgType(msg.getInternalType())
|
||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||
.build();
|
||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback());
|
||||
ctx.getTelemetryService().saveAttributes(attributesSaveRequest);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user