added cache class
This commit is contained in:
parent
1482d1c4bb
commit
a9f39e4917
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.cf;
|
||||
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public interface CalculatedFieldCache {
|
||||
|
||||
CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
|
||||
|
||||
CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService);
|
||||
|
||||
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void evict(CalculatedFieldId calculatedFieldId);
|
||||
|
||||
}
|
||||
@ -0,0 +1,212 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.cf;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
|
||||
private final Lock calculatedFieldFetchLock = new ReentrantLock();
|
||||
|
||||
private final CalculatedFieldService calculatedFieldService;
|
||||
private final AssetService assetService;
|
||||
private final DeviceService deviceService;
|
||||
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedFieldCtx> calculatedFieldsCtx = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EntityId, Set<EntityId>> profileEntities = new ConcurrentHashMap<>();
|
||||
|
||||
@Value("${calculatedField.initFetchPackSize:50000}")
|
||||
@Getter
|
||||
private int initFetchPackSize;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// to discuss: fetch on start or fetch on demand
|
||||
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
|
||||
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
|
||||
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
|
||||
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
CalculatedField calculatedField = calculatedFields.get(calculatedFieldId);
|
||||
if (calculatedField == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
calculatedField = calculatedFields.get(calculatedFieldId);
|
||||
if (calculatedField == null) {
|
||||
calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId);
|
||||
if (calculatedField != null) {
|
||||
calculatedFields.put(calculatedFieldId, calculatedField);
|
||||
log.debug("[{}] Fetch calculated field into cache: {}", calculatedFieldId, calculatedField);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found calculated field in cache: {}", calculatedFieldId, calculatedField);
|
||||
return calculatedField;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
List<CalculatedFieldLink> cfLinks = calculatedFieldLinks.get(calculatedFieldId);
|
||||
if (cfLinks == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
cfLinks = calculatedFieldLinks.get(calculatedFieldId);
|
||||
if (cfLinks == null) {
|
||||
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksById(tenantId, calculatedFieldId);
|
||||
if (cfLinks != null) {
|
||||
calculatedFieldLinks.put(calculatedFieldId, cfLinks);
|
||||
log.debug("[{}] Fetch calculated field links into cache: {}", calculatedFieldId, cfLinks);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found calculated field links in cache: {}", calculatedFieldId, cfLinks);
|
||||
return cfLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) {
|
||||
List<CalculatedFieldLink> cfLinks = entityIdCalculatedFieldLinks.get(entityId);
|
||||
if (cfLinks == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
cfLinks = entityIdCalculatedFieldLinks.get(entityId);
|
||||
if (cfLinks == null) {
|
||||
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId);
|
||||
if (cfLinks != null) {
|
||||
entityIdCalculatedFieldLinks.put(entityId, cfLinks);
|
||||
log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found calculated field links by entity id in cache: {}", entityId, cfLinks);
|
||||
return cfLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) {
|
||||
CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||
if (ctx == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||
if (ctx == null) {
|
||||
CalculatedField calculatedField = getCalculatedField(tenantId, calculatedFieldId);
|
||||
if (calculatedField != null) {
|
||||
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService);
|
||||
calculatedFieldsCtx.put(calculatedFieldId, ctx);
|
||||
log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityProfileId) {
|
||||
Set<EntityId> entities = profileEntities.get(entityProfileId);
|
||||
if (entities == null) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
entities = profileEntities.get(entityProfileId);
|
||||
if (entities == null) {
|
||||
entities = switch (entityProfileId.getEntityType()) {
|
||||
case ASSET_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> {
|
||||
Set<EntityId> assetIds = new HashSet<>();
|
||||
(new PageDataIterable<>(pageLink ->
|
||||
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) profileId, pageLink), initFetchPackSize)).forEach(assetIds::add);
|
||||
return assetIds;
|
||||
});
|
||||
case DEVICE_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> {
|
||||
Set<EntityId> deviceIds = new HashSet<>();
|
||||
(new PageDataIterable<>(pageLink ->
|
||||
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityProfileId, pageLink), initFetchPackSize)).forEach(deviceIds::add);
|
||||
return deviceIds;
|
||||
});
|
||||
default ->
|
||||
throw new IllegalArgumentException("Entity type should be ASSET_PROFILE or DEVICE_PROFILE.");
|
||||
};
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Found entities by profile in cache: {}", entityProfileId, entities);
|
||||
return entities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(CalculatedFieldId calculatedFieldId) {
|
||||
CalculatedField oldCalculatedField = calculatedFields.remove(calculatedFieldId);
|
||||
log.debug("[{}] evict calculated field from cache: {}", calculatedFieldId, oldCalculatedField);
|
||||
calculatedFieldLinks.remove(calculatedFieldId);
|
||||
log.debug("[{}] evict calculated field links from cache: {}", calculatedFieldId, oldCalculatedField);
|
||||
calculatedFieldsCtx.remove(calculatedFieldId);
|
||||
log.debug("[{}] evict calculated field ctx from cache: {}", calculatedFieldId, oldCalculatedField);
|
||||
entityIdCalculatedFieldLinks.forEach((entityId, calculatedFieldLinks) -> calculatedFieldLinks.removeIf(link -> link.getCalculatedFieldId().equals(calculatedFieldId)));
|
||||
log.debug("[{}] evict calculated field links from cached links by entity id: {}", calculatedFieldId, oldCalculatedField);
|
||||
}
|
||||
|
||||
}
|
||||
@ -18,7 +18,11 @@ package org.thingsboard.server.service.cf;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.*;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
@ -37,8 +41,23 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
|
||||
import org.thingsboard.server.common.data.id.*;
|
||||
import org.thingsboard.server.common.data.kv.*;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.Aggregation;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -46,10 +65,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
@ -67,8 +84,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -77,7 +94,6 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -90,10 +106,9 @@ import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||
public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBasedService<CalculatedFieldId> implements CalculatedFieldExecutionService {
|
||||
|
||||
private final CalculatedFieldService calculatedFieldService;
|
||||
private final AssetService assetService;
|
||||
private final DeviceService deviceService;
|
||||
private final TbAssetProfileCache assetProfileCache;
|
||||
private final TbDeviceProfileCache deviceProfileCache;
|
||||
private final CalculatedFieldCache calculatedFieldCache;
|
||||
private final AttributesService attributesService;
|
||||
private final TimeseriesService timeseriesService;
|
||||
private final RocksDBService rocksDBService;
|
||||
@ -103,13 +118,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
private ListeningExecutorService calculatedFieldExecutor;
|
||||
private ListeningExecutorService calculatedFieldCallbackExecutor;
|
||||
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedFieldCtx> calculatedFieldsCtx = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>();
|
||||
|
||||
private final ConcurrentMap<EntityId, Set<EntityId>> profileEntities = new ConcurrentHashMap<>();
|
||||
|
||||
private static final int MAX_LAST_RECORDS_VALUE = 1024;
|
||||
|
||||
@Value("${calculatedField.initFetchPackSize:50000}")
|
||||
@ -123,20 +133,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field"));
|
||||
calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
|
||||
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback"));
|
||||
scheduledExecutor.submit(this::fetchCalculatedFields);
|
||||
}
|
||||
|
||||
private void fetchCalculatedFields() {
|
||||
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
|
||||
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
|
||||
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
|
||||
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
|
||||
rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldEntityCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldEntityCtx.class)));
|
||||
states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId())));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
super.stop();
|
||||
if (calculatedFieldExecutor != null) {
|
||||
calculatedFieldExecutor.shutdownNow();
|
||||
}
|
||||
@ -183,7 +184,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
for (CalculatedField cf : partition) {
|
||||
EntityId cfEntityId = cf.getEntityId();
|
||||
if (isProfileEntity(cfEntityId)) {
|
||||
getOrFetchFromDBProfileEntities(cf.getTenantId(), cfEntityId)
|
||||
calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId)
|
||||
.forEach(entityId -> restoreState(cf, entityId));
|
||||
} else {
|
||||
restoreState(cf, cfEntityId);
|
||||
@ -206,7 +207,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
|
||||
if (storedState != null) {
|
||||
CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class);
|
||||
calculatedFieldsCtx.putIfAbsent(cf.getId(), new CalculatedFieldCtx(cf, tbelInvokeService));
|
||||
states.put(ctxId, restoredCtx);
|
||||
log.info("Restored state for CalculatedField [{}]", cf.getId());
|
||||
} else {
|
||||
@ -220,7 +220,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
private void cleanupEntity(CalculatedFieldId calculatedFieldId) {
|
||||
calculatedFields.remove(calculatedFieldId);
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()));
|
||||
}
|
||||
|
||||
@ -235,7 +234,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
onCalculatedFieldDelete(tenantId, calculatedFieldId, callback);
|
||||
callback.onSuccess();
|
||||
}
|
||||
CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId);
|
||||
CalculatedField cf = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId);
|
||||
if (proto.getUpdated()) {
|
||||
log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId);
|
||||
boolean shouldReinit = onCalculatedFieldUpdate(cf, callback);
|
||||
@ -245,8 +244,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
if (cf != null) {
|
||||
EntityId entityId = cf.getEntityId();
|
||||
CalculatedFieldCtx calculatedFieldCtx = new CalculatedFieldCtx(cf, tbelInvokeService);
|
||||
calculatedFieldsCtx.put(calculatedFieldId, calculatedFieldCtx);
|
||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
||||
switch (entityId.getEntityType()) {
|
||||
case ASSET, DEVICE -> {
|
||||
log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
@ -258,7 +256,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
.filter(entry -> !isProfileEntity(entry.getValue().getEntityId()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
fetchArguments(tenantId, entityId, commonArguments, commonArgs -> {
|
||||
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> {
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, entityId).forEach(targetEntityId -> {
|
||||
initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArgs, callback);
|
||||
});
|
||||
});
|
||||
@ -290,8 +288,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
} else if (EntityType.DEVICE.equals(entityType)) {
|
||||
profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
|
||||
}
|
||||
List<CalculatedFieldLink> cfLinks = new ArrayList<>(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId));
|
||||
Optional.ofNullable(profileId).ifPresent(id -> cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, id)));
|
||||
List<CalculatedFieldLink> cfLinks = calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId);
|
||||
Optional.ofNullable(profileId).ifPresent(id -> calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id));
|
||||
cfLinks.forEach(link -> {
|
||||
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
|
||||
Map<String, String> attributes = link.getConfiguration().getAttributes();
|
||||
@ -316,8 +314,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
|
||||
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> calculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
|
||||
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId);
|
||||
CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId);
|
||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService));
|
||||
CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId);
|
||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
||||
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
|
||||
|
||||
@ -326,7 +324,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
case ASSET_PROFILE, DEVICE_PROFILE -> {
|
||||
boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId);
|
||||
if (isCommonEntity) {
|
||||
getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds));
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds));
|
||||
} else {
|
||||
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, calculatedFieldIds);
|
||||
}
|
||||
@ -353,28 +351,61 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
return entry.getKey();
|
||||
}
|
||||
|
||||
private Object deserializeObjectProto(TransportProtos.ObjectProto objectProto) {
|
||||
try {
|
||||
String type = objectProto.getType();
|
||||
String value = objectProto.getValue();
|
||||
return switch (type) {
|
||||
case "java.lang.String" -> value;
|
||||
case "java.lang.Integer" -> Integer.parseInt(value);
|
||||
case "java.lang.Long" -> Long.parseLong(value);
|
||||
case "java.lang.Double" -> Double.parseDouble(value);
|
||||
case "java.lang.Boolean" -> Boolean.parseBoolean(value);
|
||||
default -> throw new IllegalArgumentException("Unsupported object type: " + type);
|
||||
};
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to deserialize ObjectProto: [{}]", objectProto, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
||||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB()));
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
String state = proto.getState();
|
||||
CalculatedFieldEntityCtx calculatedFieldEntityCtx = state.isEmpty() ? JacksonUtil.fromString(state, CalculatedFieldEntityCtx.class) : null;
|
||||
List<CalculatedFieldId> calculatedFieldIds = new ArrayList<>();
|
||||
for (TransportProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) {
|
||||
CalculatedFieldId cfId = new CalculatedFieldId(new UUID(
|
||||
cfIdProto.getCalculatedFieldIdMSB(),
|
||||
cfIdProto.getCalculatedFieldIdLSB()
|
||||
));
|
||||
calculatedFieldIds.add(cfId);
|
||||
}
|
||||
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId);
|
||||
if (tpi.isMyPartition()) {
|
||||
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId());
|
||||
if (calculatedFieldEntityCtx != null) {
|
||||
states.put(ctxId, calculatedFieldEntityCtx);
|
||||
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), state);
|
||||
Map<String, ArgumentEntry> argumentsMap = new HashMap<>();
|
||||
proto.getArgumentsMap().forEach((key, entryProto) -> {
|
||||
ArgumentEntry argumentEntry;
|
||||
if (entryProto.hasTsRecords()) {
|
||||
TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry();
|
||||
entryProto.getTsRecords().getTsRecordsMap().forEach((ts, objectProto) -> {
|
||||
Object value = deserializeObjectProto(objectProto);
|
||||
tsRollingArgumentEntry.getTsRecords().put(ts, value);
|
||||
});
|
||||
argumentEntry = tsRollingArgumentEntry;
|
||||
} else if (entryProto.hasSingleValue()) {
|
||||
TransportProtos.SingleValueProto singleRecordProto = entryProto.getSingleValue();
|
||||
Object value = deserializeObjectProto(singleRecordProto.getValue());
|
||||
argumentEntry = new SingleValueArgumentEntry(singleRecordProto.getTs(), value);
|
||||
} else {
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Calculated Field belongs to external partition {}", calculatedFieldId, tpi.getFullTopicName());
|
||||
throw new IllegalArgumentException("Unsupported ArgumentEntryProto type");
|
||||
}
|
||||
argumentsMap.put(key, argumentEntry);
|
||||
});
|
||||
|
||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
||||
updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, calculatedFieldIds);
|
||||
} catch (Exception e) {
|
||||
log.trace("Failed to process calculated field update state msg: [{}]", proto, e);
|
||||
}
|
||||
@ -389,8 +420,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB()));
|
||||
log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
|
||||
profileEntities.get(oldProfileId).remove(entityId);
|
||||
profileEntities.computeIfAbsent(newProfileId, id -> new HashSet<>()).add(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId);
|
||||
|
||||
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId)
|
||||
.forEach(cfId -> {
|
||||
@ -400,7 +431,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null);
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null);
|
||||
}
|
||||
});
|
||||
|
||||
@ -419,12 +450,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
log.info("Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
if (proto.getDeleted()) {
|
||||
log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
profileEntities.get(profileId).remove(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId);
|
||||
List<CalculatedFieldId> calculatedFieldIds = Stream.concat(
|
||||
calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId).stream()
|
||||
.map(CalculatedFieldLink::getCalculatedFieldId),
|
||||
calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId).stream()
|
||||
.map(CalculatedFieldLink::getCalculatedFieldId)
|
||||
calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId),
|
||||
calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId)
|
||||
).toList();
|
||||
calculatedFieldIds.forEach(cfId -> {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
|
||||
@ -433,12 +462,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null);
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
profileEntities.computeIfAbsent(profileId, id -> new HashSet<>()).add(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId);
|
||||
initializeStateForEntityByProfile(tenantId, entityId, profileId, callback);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -446,7 +475,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
}
|
||||
|
||||
private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState calculatedFieldState) {
|
||||
private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, Map<String, ArgumentEntry> argumentValues) {
|
||||
TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = TransportProtos.CalculatedFieldStateMsgProto.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
@ -455,20 +484,45 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
.setEntityType(entityId.getEntityType().name())
|
||||
.setEntityIdMSB(entityId.getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
if (calculatedFieldState != null) {
|
||||
msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState));
|
||||
|
||||
if (argumentValues != null) {
|
||||
argumentValues.forEach((key, argumentEntry) -> {
|
||||
TransportProtos.ArgumentEntryProto.Builder argumentEntryProtoBuilder = TransportProtos.ArgumentEntryProto.newBuilder();
|
||||
|
||||
if (argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
|
||||
TransportProtos.TsRollingProto.Builder tsRollingProtoBuilder = TransportProtos.TsRollingProto.newBuilder();
|
||||
|
||||
tsRollingArgumentEntry.getTsRecords().forEach((ts, value) -> {
|
||||
TransportProtos.ObjectProto.Builder objectProtoBuilder = TransportProtos.ObjectProto.newBuilder()
|
||||
.setType(value.getClass().getName())
|
||||
.setValue(value.toString());
|
||||
tsRollingProtoBuilder.putTsRecords(ts, objectProtoBuilder.build());
|
||||
});
|
||||
|
||||
argumentEntryProtoBuilder.setTsRecords(tsRollingProtoBuilder.build());
|
||||
} else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
|
||||
TransportProtos.SingleValueProto.Builder singleRecordProtoBuilder = TransportProtos.SingleValueProto.newBuilder()
|
||||
.setTs(singleValueArgumentEntry.getTs())
|
||||
.setValue(TransportProtos.ObjectProto.newBuilder()
|
||||
.setType(singleValueArgumentEntry.getValue().getClass().getName())
|
||||
.setValue(singleValueArgumentEntry.getValue().toString())
|
||||
.build());
|
||||
argumentEntryProtoBuilder.setSingleValue(singleRecordProtoBuilder.build());
|
||||
}
|
||||
|
||||
msgBuilder.putArguments(key, argumentEntryProtoBuilder.build());
|
||||
});
|
||||
}
|
||||
|
||||
clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null);
|
||||
}
|
||||
|
||||
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
|
||||
CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
|
||||
CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
|
||||
boolean shouldReinit = true;
|
||||
if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) {
|
||||
onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback);
|
||||
} else {
|
||||
calculatedFields.put(updatedCalculatedField.getId(), updatedCalculatedField);
|
||||
calculatedFieldsCtx.put(updatedCalculatedField.getId(), new CalculatedFieldCtx(updatedCalculatedField, tbelInvokeService));
|
||||
callback.onSuccess();
|
||||
shouldReinit = false;
|
||||
}
|
||||
@ -483,8 +537,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
if (calculatedFieldIds != null) {
|
||||
calculatedFieldIds.remove(calculatedFieldId);
|
||||
}
|
||||
calculatedFields.remove(calculatedFieldId);
|
||||
calculatedFieldsCtx.remove(calculatedFieldId);
|
||||
calculatedFieldCache.evict(calculatedFieldId);
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()));
|
||||
List<String> statesToRemove = states.keySet().stream()
|
||||
.filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()))
|
||||
@ -497,28 +550,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
}
|
||||
|
||||
private CalculatedField getOrFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
return calculatedFields.computeIfAbsent(calculatedFieldId, cfId -> calculatedFieldService.findById(tenantId, calculatedFieldId));
|
||||
}
|
||||
|
||||
private Set<EntityId> getOrFetchFromDBProfileEntities(TenantId tenantId, EntityId entityProfileId) {
|
||||
return switch (entityProfileId.getEntityType()) {
|
||||
case ASSET_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> {
|
||||
Set<EntityId> assetIds = new HashSet<>();
|
||||
(new PageDataIterable<>(pageLink ->
|
||||
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) profileId, pageLink), initFetchPackSize)).forEach(assetIds::add);
|
||||
return assetIds;
|
||||
});
|
||||
case DEVICE_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> {
|
||||
Set<EntityId> deviceIds = new HashSet<>();
|
||||
(new PageDataIterable<>(pageLink ->
|
||||
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityProfileId, pageLink), initFetchPackSize)).forEach(deviceIds::add);
|
||||
return deviceIds;
|
||||
});
|
||||
default -> throw new IllegalArgumentException("Entity type should be ASSET_PROFILE or DEVICE_PROFILE.");
|
||||
};
|
||||
}
|
||||
|
||||
private boolean hasSignificantChanges(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) {
|
||||
if (oldCalculatedField == null) {
|
||||
return true;
|
||||
@ -537,7 +568,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) {
|
||||
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId)
|
||||
.stream()
|
||||
.map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService)))
|
||||
.map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(tenantId, cfId, tbelInvokeService))
|
||||
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
|
||||
}
|
||||
|
||||
@ -562,7 +593,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(List<ArgumentEntry> results) {
|
||||
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, Collections.emptyList());
|
||||
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>());
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -651,33 +682,29 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues, List<CalculatedFieldId> calculatedFieldIds) {
|
||||
TenantId tenantId = calculatedFieldCtx.getTenantId();
|
||||
CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
|
||||
if (tpi.isMyPartition()) {
|
||||
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
|
||||
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()));
|
||||
|
||||
Predicate<Map<String, ArgumentEntry>> allArgsPresent = (args) ->
|
||||
args.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
|
||||
!args.containsValue(SingleValueArgumentEntry.EMPTY) && !args.containsValue(TsRollingArgumentEntry.EMPTY);
|
||||
|
||||
Consumer<CalculatedFieldState> performUpdateState = (state) -> {
|
||||
if (state.updateState(argumentValues)) {
|
||||
calculatedFieldEntityCtx.setState(state);
|
||||
TenantId tenantId = calculatedFieldCtx.getTenantId();
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
|
||||
if (tpi.isMyPartition()) {
|
||||
states.put(entityCtxId, calculatedFieldEntityCtx);
|
||||
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state);
|
||||
}
|
||||
|
||||
if (allArgsPresent.test(state.getArguments())) {
|
||||
Map<String, ArgumentEntry> arguments = state.getArguments();
|
||||
boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
|
||||
!arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY);
|
||||
if (allArgsPresent) {
|
||||
performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
CalculatedFieldState state = calculatedFieldEntityCtx.getState();
|
||||
|
||||
boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
|
||||
if (!allKeysPresent) {
|
||||
|
||||
@ -691,6 +718,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
return;
|
||||
}
|
||||
performUpdateState.accept(state);
|
||||
states.put(entityCtxId, calculatedFieldEntityCtx);
|
||||
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentValues);
|
||||
}
|
||||
}
|
||||
|
||||
private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds) {
|
||||
@ -724,6 +756,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
|
||||
TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
|
||||
ObjectNode payload = createJsonPayload(calculatedFieldResult);
|
||||
if (calculatedFieldIds == null) {
|
||||
calculatedFieldIds = new ArrayList<>();
|
||||
}
|
||||
if (calculatedFieldIds.contains(calculatedFieldId)) {
|
||||
throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop.");
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
@ -191,6 +192,16 @@ public final class TbMsg implements Serializable {
|
||||
builder.setPartition(msg.getPartition());
|
||||
}
|
||||
|
||||
if (msg.getCalculatedFieldIds() != null) {
|
||||
for (CalculatedFieldId calculatedFieldId : msg.getCalculatedFieldIds()) {
|
||||
MsgProtos.CalculatedFieldIdProto calculatedFieldIdProto = MsgProtos.CalculatedFieldIdProto.newBuilder()
|
||||
.setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits())
|
||||
.setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits())
|
||||
.build();
|
||||
builder.addCalculatedFields(calculatedFieldIdProto);
|
||||
}
|
||||
}
|
||||
|
||||
builder.setCtx(msg.ctx.toProto());
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
@ -495,8 +506,8 @@ public final class TbMsg implements Serializable {
|
||||
", type=" + this.type + ", internalType=" + this.internalType + ", originator=" + this.originator +
|
||||
", customerId=" + this.customerId + ", metaData=" + this.metaData + ", dataType=" + this.dataType +
|
||||
", data=" + this.data + ", ruleChainId=" + this.ruleChainId + ", ruleNodeId=" + this.ruleNodeId +
|
||||
", correlationId=" + this.correlationId + ", partition=" + this.partition + ", ctx=" + this.ctx +
|
||||
", callback=" + this.callback + ")";
|
||||
", correlationId=" + this.correlationId + ", partition=" + this.partition + ", calculatedFields=" + this.calculatedFieldIds +
|
||||
", ctx=" + this.ctx + ", callback=" + this.callback + ")";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -817,7 +817,34 @@ message CalculatedFieldStateMsgProto {
|
||||
string entityType = 5;
|
||||
int64 entityIdMSB = 6;
|
||||
int64 entityIdLSB = 7;
|
||||
string state = 8;
|
||||
repeated CalculatedFieldIdProto calculatedFields = 8;
|
||||
map<string, ArgumentEntryProto> arguments = 9;
|
||||
}
|
||||
|
||||
message CalculatedFieldIdProto {
|
||||
int64 calculatedFieldIdMSB = 1;
|
||||
int64 calculatedFieldIdLSB = 2;
|
||||
}
|
||||
|
||||
message ArgumentEntryProto {
|
||||
oneof entry_type {
|
||||
TsRollingProto tsRecords = 1;
|
||||
SingleValueProto singleValue = 2;
|
||||
}
|
||||
}
|
||||
|
||||
message TsRollingProto {
|
||||
map<int64, ObjectProto> tsRecords = 1;
|
||||
}
|
||||
|
||||
message SingleValueProto {
|
||||
int64 ts = 1;
|
||||
ObjectProto value = 2;
|
||||
}
|
||||
|
||||
message ObjectProto {
|
||||
string type = 1;
|
||||
string value = 2;
|
||||
}
|
||||
|
||||
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
|
||||
|
||||
@ -125,6 +125,7 @@ public class TbMsgAttributesNode implements TbNode {
|
||||
.scope(scope)
|
||||
.entries(attributes)
|
||||
.notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)))
|
||||
.calculatedFieldIds(msg.getCalculatedFieldIds())
|
||||
.callback(callback)
|
||||
.build());
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user