fixes for performance
This commit is contained in:
		
							parent
							
								
									2f061c448b
								
							
						
					
					
						commit
						a2382c206f
					
				@ -318,7 +318,6 @@ public class ActorSystemContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private TbEntityViewService tbEntityViewService;
 | 
			
		||||
 | 
			
		||||
    @Lazy
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Getter
 | 
			
		||||
    private TelemetrySubscriptionService tsSubService;
 | 
			
		||||
@ -537,17 +536,14 @@ public class ActorSystemContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private EntityService entityService;
 | 
			
		||||
 | 
			
		||||
    @Lazy
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private CalculatedFieldProcessingService calculatedFieldProcessingService;
 | 
			
		||||
 | 
			
		||||
    @Lazy
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private CalculatedFieldStateService calculatedFieldStateService;
 | 
			
		||||
 | 
			
		||||
    @Lazy
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private CalculatedFieldQueueService calculatedFieldQueueService;
 | 
			
		||||
 | 
			
		||||
@ -18,8 +18,11 @@ package org.thingsboard.server.service.cf;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.ConcurrentReferenceHashMap;
 | 
			
		||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedField;
 | 
			
		||||
@ -49,14 +52,14 @@ import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
 | 
			
		||||
    private static final Integer UNKNOWN_PARTITION = -1;
 | 
			
		||||
 | 
			
		||||
    private final Lock calculatedFieldFetchLock = new ReentrantLock();
 | 
			
		||||
    private final ConcurrentReferenceHashMap<CalculatedFieldId, Lock> calculatedFieldFetchLocks = new ConcurrentReferenceHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final CalculatedFieldService calculatedFieldService;
 | 
			
		||||
    private final TbelInvokeService tbelInvokeService;
 | 
			
		||||
    private final ActorSystemContext actorSystemContext;
 | 
			
		||||
    private final ApiLimitService apiLimitService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private ActorSystemContext actorSystemContext;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<EntityId, List<CalculatedField>> entityIdCalculatedFields = new ConcurrentHashMap<>();
 | 
			
		||||
@ -98,19 +101,20 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public List<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId) {
 | 
			
		||||
        return entityIdCalculatedFields.getOrDefault(entityId, new CopyOnWriteArrayList<>());
 | 
			
		||||
        return entityIdCalculatedFields.getOrDefault(entityId, Collections.emptyList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId) {
 | 
			
		||||
        return entityIdCalculatedFieldLinks.getOrDefault(entityId, new CopyOnWriteArrayList<>());
 | 
			
		||||
        return entityIdCalculatedFieldLinks.getOrDefault(entityId, Collections.emptyList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId) {
 | 
			
		||||
        CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
 | 
			
		||||
        if (ctx == null) {
 | 
			
		||||
            calculatedFieldFetchLock.lock();
 | 
			
		||||
            Lock lock = getFetchLock(calculatedFieldId);
 | 
			
		||||
            lock.lock();
 | 
			
		||||
            try {
 | 
			
		||||
                ctx = calculatedFieldsCtx.get(calculatedFieldId);
 | 
			
		||||
                if (ctx == null) {
 | 
			
		||||
@ -122,13 +126,17 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } finally {
 | 
			
		||||
                calculatedFieldFetchLock.unlock();
 | 
			
		||||
                lock.unlock();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx);
 | 
			
		||||
        return ctx;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Lock getFetchLock(CalculatedFieldId id) {
 | 
			
		||||
        return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public List<CalculatedFieldCtx> getCalculatedFieldCtxsByEntityId(EntityId entityId) {
 | 
			
		||||
        if (entityId == null) {
 | 
			
		||||
@ -141,7 +149,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
 | 
			
		||||
        calculatedFieldFetchLock.lock();
 | 
			
		||||
        Lock lock = getFetchLock(calculatedFieldId);
 | 
			
		||||
        lock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId);
 | 
			
		||||
            if (calculatedField == null) {
 | 
			
		||||
@ -163,7 +172,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
                                .add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId));
 | 
			
		||||
                    });
 | 
			
		||||
        } finally {
 | 
			
		||||
            calculatedFieldFetchLock.unlock();
 | 
			
		||||
            lock.unlock();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -133,7 +133,8 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
 | 
			
		||||
        }
 | 
			
		||||
        boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter);
 | 
			
		||||
        if (send) {
 | 
			
		||||
            clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback));
 | 
			
		||||
            ToCalculatedFieldMsg calculatedFieldMsg = msg.get();
 | 
			
		||||
            clusterService.pushMsgToCalculatedFields(tenantId, entityId, calculatedFieldMsg, wrap(callback));
 | 
			
		||||
        } else {
 | 
			
		||||
            if (callback != null) {
 | 
			
		||||
                callback.onSuccess(null);
 | 
			
		||||
@ -142,20 +143,35 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate<CalculatedFieldCtx> filter, Predicate<CalculatedFieldCtx> linkedEntityFilter) {
 | 
			
		||||
        boolean send = false;
 | 
			
		||||
        if (supportedReferencedEntities.contains(entityId.getEntityType())) {
 | 
			
		||||
            send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter);
 | 
			
		||||
            if (!send) {
 | 
			
		||||
                send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter);
 | 
			
		||||
        if (!supportedReferencedEntities.contains(entityId.getEntityType())) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
            if (!send) {
 | 
			
		||||
                send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream()
 | 
			
		||||
                        .map(CalculatedFieldLink::getCalculatedFieldId)
 | 
			
		||||
                        .map(calculatedFieldCache::getCalculatedFieldCtx)
 | 
			
		||||
                        .anyMatch(linkedEntityFilter);
 | 
			
		||||
        List<CalculatedFieldCtx> entityCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId);
 | 
			
		||||
        for (CalculatedFieldCtx ctx : entityCfs) {
 | 
			
		||||
            if (filter.test(ctx)) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return send;
 | 
			
		||||
 | 
			
		||||
        EntityId profileId = getProfileId(tenantId, entityId);
 | 
			
		||||
        if (profileId != null) {
 | 
			
		||||
            List<CalculatedFieldCtx> profileCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(profileId);
 | 
			
		||||
            for (CalculatedFieldCtx ctx : profileCfs) {
 | 
			
		||||
                if (filter.test(ctx)) {
 | 
			
		||||
                    return true;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        List<CalculatedFieldLink> links = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId);
 | 
			
		||||
        for (CalculatedFieldLink link : links) {
 | 
			
		||||
            CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(link.getCalculatedFieldId());
 | 
			
		||||
            if (ctx != null && linkedEntityFilter.test(ctx)) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
 | 
			
		||||
@ -181,6 +181,10 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
 | 
			
		||||
        if (argMap.isEmpty() || values.isEmpty()) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (AttributeKvEntry attrKv : values) {
 | 
			
		||||
            ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope);
 | 
			
		||||
            if (argMap.containsKey(attrKey)) {
 | 
			
		||||
@ -191,13 +195,14 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean matchesTimeSeries(Map<ReferencedEntityKey, String> argMap, List<TsKvEntry> values) {
 | 
			
		||||
        for (TsKvEntry tsKv : values) {
 | 
			
		||||
            ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null);
 | 
			
		||||
            if (argMap.containsKey(latestKey)) {
 | 
			
		||||
                return true;
 | 
			
		||||
        if (argMap.isEmpty() || values.isEmpty()) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
            ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null);
 | 
			
		||||
            if (argMap.containsKey(rollingKey)) {
 | 
			
		||||
 | 
			
		||||
        for (TsKvEntry tsKv : values) {
 | 
			
		||||
            String key = tsKv.getKey();
 | 
			
		||||
            if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) ||
 | 
			
		||||
                    argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -213,6 +218,10 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
 | 
			
		||||
        if (argMap.isEmpty() || keys.isEmpty()) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (String key : keys) {
 | 
			
		||||
            ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
 | 
			
		||||
            if (argMap.containsKey(attrKey)) {
 | 
			
		||||
@ -223,13 +232,13 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
 | 
			
		||||
        for (String key : keys) {
 | 
			
		||||
            ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
 | 
			
		||||
            if (argMap.containsKey(latestKey)) {
 | 
			
		||||
                return true;
 | 
			
		||||
        if (argMap.isEmpty() || keys.isEmpty()) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
            ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null);
 | 
			
		||||
            if (argMap.containsKey(rollingKey)) {
 | 
			
		||||
 | 
			
		||||
        for (String key : keys) {
 | 
			
		||||
            if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) ||
 | 
			
		||||
                    argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -30,9 +30,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
 | 
			
		||||
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
 | 
			
		||||
@ -111,22 +109,9 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
 | 
			
		||||
        if (stateMsgProto == null) {
 | 
			
		||||
            putStateId(msg.getHeaders(), stateId);
 | 
			
		||||
        }
 | 
			
		||||
        stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(TbQueueMsgMetadata metadata) {
 | 
			
		||||
                if (callback != null) {
 | 
			
		||||
        stateProducer.send(tpi, stateId.toKey(), msg, null);
 | 
			
		||||
        callback.onSuccess();
 | 
			
		||||
    }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                if (callback != null) {
 | 
			
		||||
                    callback.onFailure(t);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
 | 
			
		||||
 | 
			
		||||
@ -130,11 +130,9 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
    private final AtomicInteger toEdgeNfs = new AtomicInteger(0);
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private PartitionService partitionService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private TbQueueProducerProvider producerProvider;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,6 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
 | 
			
		||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey;
 | 
			
		||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -22,7 +22,6 @@ import org.springframework.transaction.annotation.Transactional;
 | 
			
		||||
import org.thingsboard.server.dao.AbstractVersionedInsertRepository;
 | 
			
		||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
 | 
			
		||||
 | 
			
		||||
import java.sql.PreparedStatement;
 | 
			
		||||
@ -33,7 +32,6 @@ import java.util.List;
 | 
			
		||||
@SqlTsLatestAnyDao
 | 
			
		||||
@Repository
 | 
			
		||||
@Transactional
 | 
			
		||||
@SqlDao
 | 
			
		||||
public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository<TsKvLatestEntity> implements InsertLatestTsRepository {
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ts_latest.update_by_latest_ts:true}")
 | 
			
		||||
 | 
			
		||||
@ -33,9 +33,8 @@ public class KvUtils {
 | 
			
		||||
 | 
			
		||||
    static {
 | 
			
		||||
        validatedKeys = Caffeine.newBuilder()
 | 
			
		||||
                .weakKeys()
 | 
			
		||||
                .expireAfterAccess(24, TimeUnit.HOURS)
 | 
			
		||||
                .maximumSize(100000).build();
 | 
			
		||||
                .maximumSize(50000).build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static void validate(List<? extends KvEntry> tsKvEntries, boolean valueNoXssValidation) {
 | 
			
		||||
@ -57,11 +56,13 @@ public class KvUtils {
 | 
			
		||||
            throw new DataValidationException("Validation error: key length must be equal or less than 255");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (validatedKeys.getIfPresent(key) == null) {
 | 
			
		||||
            if (!NoXssValidator.isValid(key)) {
 | 
			
		||||
                throw new DataValidationException("Validation error: key is malformed");
 | 
			
		||||
        Boolean isValid = validatedKeys.asMap().get(key);
 | 
			
		||||
        if (isValid == null) {
 | 
			
		||||
            isValid = NoXssValidator.isValid(key);
 | 
			
		||||
            validatedKeys.put(key, isValid);
 | 
			
		||||
        }
 | 
			
		||||
            validatedKeys.put(key, Boolean.TRUE);
 | 
			
		||||
        if (!isValid) {
 | 
			
		||||
            throw new DataValidationException("Validation error: key is malformed");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (valueNoXssValidation) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user