diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
index e109e83e4d..27d3ce315a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
@@ -322,11 +322,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
if (isProfileEntity(entityType)) {
- var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
+ var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId);
if (!entityIds.isEmpty()) {
//TODO: no need to do this if we cache all created actors and know which one belong to us;
var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
- entityIds.forEach(id -> deleteCfForEntity(id, cfId, multiCallback));
+ entityIds.forEach(id -> {
+ if (isMyPartition(id, multiCallback)) {
+ deleteCfForEntity(id, cfId, multiCallback);
+ }
+ });
} else {
callback.onSuccess();
}
@@ -366,10 +370,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId sourceEntityId = msg.getEntityId();
log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId);
var proto = msg.getProto();
+ var callback = msg.getCallback();
var linksList = proto.getLinksList();
if (linksList.isEmpty()) {
log.debug("[{}] No CF links to process new telemetry.", msg.getTenantId());
- msg.getCallback().onSuccess();
+ callback.onSuccess();
}
for (var linkProto : linksList) {
var link = fromProto(linkProto);
@@ -378,21 +383,25 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var cf = calculatedFields.get(link.cfId());
if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) {
// iterate over all entities that belong to profile and push the message for corresponding CF
- var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, targetEntityId);
+ var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, targetEntityId);
if (!entityIds.isEmpty()) {
- MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
- var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
+ MultipleTbCallback multipleCallback = new MultipleTbCallback(entityIds.size(), callback);
+ var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, multipleCallback);
entityIds.forEach(entityId -> {
- log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
- getOrCreateActor(entityId).tell(newMsg);
+ if (isMyPartition(entityId, multipleCallback)) {
+ log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
+ getOrCreateActor(entityId).tell(newMsg);
+ }
});
} else {
- msg.getCallback().onSuccess();
+ callback.onSuccess();
}
} else {
- log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
- var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback());
- getOrCreateActor(targetEntityId).tell(newMsg);
+ if (isMyPartition(targetEntityId, callback)) {
+ log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
+ var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
+ getOrCreateActor(targetEntityId).tell(newMsg);
+ }
}
}
}
@@ -436,10 +445,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
if (isProfileEntity(entityType)) {
- var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
+ var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId);
if (!entityIds.isEmpty()) {
var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
- entityIds.forEach(id -> initCfForEntity(id, cfCtx, forceStateReinit, multiCallback));
+ entityIds.forEach(id -> {
+ if (isMyPartition(id, multiCallback)) {
+ initCfForEntity(id, cfCtx, forceStateReinit, multiCallback);
+ }
+ });
} else {
callback.onSuccess();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java
index bb5ef91974..df2dc88f6b 100644
--- a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java
+++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java
@@ -30,7 +30,11 @@ public interface CalculatedFieldEntityProfileCache extends ApplicationListener
getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId);
+ void evictProfile(TenantId tenantId, EntityId profileId);
+
+ void removeTenant(TenantId tenantId);
+
+ Collection getEntityIdsByProfileId(TenantId tenantId, EntityId profileId);
int getEntityIdPartition(TenantId tenantId, EntityId entityId);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java
index 080907587e..8059b3a825 100644
--- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java
+++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java
@@ -22,17 +22,12 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
-import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -50,50 +45,23 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
- Map> tenantPartitions = new HashMap<>();
- List systemPartitions = new ArrayList<>();
-
- event.getCfPartitions().stream()
- .filter(TopicPartitionInfo::isMyPartition)
- .forEach(tpi -> {
- Integer partition = tpi.getPartition().orElse(UNKNOWN);
- Optional tenantIdOpt = tpi.getTenantId();
- if (tenantIdOpt.isPresent()) {
- tenantPartitions.computeIfAbsent(tenantIdOpt.get(), id -> new ArrayList<>()).add(partition);
- } else {
- systemPartitions.add(partition);
- }
- });
-
- tenantPartitions.forEach((tenantId, partitions) -> {
- var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
- cache.setMyPartitions(partitions);
+ event.getCfPartitions().forEach(tpi -> {
+ Optional tenantIdOpt = tpi.getTenantId();
+ tenantIdOpt.ifPresent(tenantId -> tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()));
});
-
- tenantCache.keySet().stream()
- .filter(tenantId -> !tenantPartitions.containsKey(tenantId))
- .forEach(tenantId -> {
- var cache = tenantCache.get(tenantId);
- cache.setMyPartitions(systemPartitions);
- });
}
@Override
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
- var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
- var partition = tpi.getPartition().orElse(UNKNOWN);
- tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
- .add(profileId, entityId, partition, tpi.isMyPartition());
+ tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).add(profileId, entityId);
}
@Override
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
- var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
- var partition = tpi.getPartition().orElse(UNKNOWN);
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
//TODO: make this method atomic;
cache.remove(oldProfileId, entityId);
- cache.add(newProfileId, entityId, partition, tpi.isMyPartition());
+ cache.add(newProfileId, entityId);
}
@Override
@@ -103,8 +71,18 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
}
@Override
- public Collection getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId) {
- return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getMyEntityIdsByProfileId(profileId);
+ public void evictProfile(TenantId tenantId, EntityId profileId) {
+ tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).removeProfileId(profileId);
+ }
+
+ @Override
+ public void removeTenant(TenantId tenantId) {
+ tenantCache.remove(tenantId);
+ }
+
+ @Override
+ public Collection getEntityIdsByProfileId(TenantId tenantId, EntityId profileId) {
+ return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getEntityIdsByProfileId(profileId);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java
index 1a17b9b8be..27c2bdd6d5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java
+++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java
@@ -32,31 +32,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TenantEntityProfileCache {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final Map>> allEntities = new HashMap<>();
- private final Map> myEntities = new HashMap<>();
-
- public void setMyPartitions(List myPartitions) {
- lock.writeLock().lock();
- try {
- myEntities.clear();
- myPartitions.forEach(partitionId -> {
- var map = allEntities.get(partitionId);
- if (map != null) {
- map.forEach((profileId, entityIds) -> myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).addAll(entityIds));
- }
- });
- } finally {
- lock.writeLock().unlock();
- }
- }
+ private final Map> allEntities = new HashMap<>();
public void removeProfileId(EntityId profileId) {
lock.writeLock().lock();
try {
// Remove from allEntities
- allEntities.values().forEach(map -> map.remove(profileId));
- // Remove from myEntities
- myEntities.remove(profileId);
+ allEntities.remove(profileId);
} finally {
lock.writeLock().unlock();
}
@@ -66,9 +48,7 @@ public class TenantEntityProfileCache {
lock.writeLock().lock();
try {
// Remove from allEntities
- allEntities.values().forEach(map -> map.values().forEach(set -> set.remove(entityId)));
- // Remove from myEntities
- myEntities.values().forEach(set -> set.remove(entityId));
+ allEntities.values().forEach(set -> set.remove(entityId));
} finally {
lock.writeLock().unlock();
}
@@ -78,33 +58,28 @@ public class TenantEntityProfileCache {
lock.writeLock().lock();
try {
// Remove from allEntities
- allEntities.values().forEach(map -> removeSafely(map, profileId, entityId));
- // Remove from myEntities
- removeSafely(myEntities, profileId, entityId);
+ removeSafely(allEntities, profileId, entityId);
} finally {
lock.writeLock().unlock();
}
}
- public void add(EntityId profileId, EntityId entityId, Integer partition, boolean mine) {
+ public void add(EntityId profileId, EntityId entityId) {
lock.writeLock().lock();
try {
- if(EntityType.DEVICE.equals(profileId.getEntityType())){
- throw new RuntimeException("WTF?");
+ if (EntityType.DEVICE.equals(profileId.getEntityType()) || EntityType.ASSET.equals(profileId.getEntityType())) {
+ throw new RuntimeException("Entity type '" + profileId.getEntityType() + "' is not a profileId.");
}
- if (mine) {
- myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId);
- }
- allEntities.computeIfAbsent(partition, k -> new HashMap<>()).computeIfAbsent(profileId, p -> new HashSet<>()).add(entityId);
+ allEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId);
} finally {
lock.writeLock().unlock();
}
}
- public Collection getMyEntityIdsByProfileId(EntityId profileId) {
+ public Collection getEntityIdsByProfileId(EntityId profileId) {
lock.readLock().lock();
try {
- var entities = myEntities.getOrDefault(profileId, Collections.emptySet());
+ var entities = allEntities.getOrDefault(profileId, Collections.emptySet());
List result = new ArrayList<>(entities.size());
result.addAll(entities);
return result;
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
index bce1992932..b543e860f3 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
@@ -52,6 +52,7 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
+import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
@@ -80,6 +81,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
private final TbRuleEngineQueueFactory queueFactory;
private final CalculatedFieldStateService stateService;
+ private final CalculatedFieldEntityProfileCache entityProfileCache;
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
ActorSystemContext actorContext,
@@ -91,11 +93,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache,
- CalculatedFieldStateService stateService) {
+ CalculatedFieldStateService stateService,
+ CalculatedFieldEntityProfileCache entityProfileCache) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory;
this.stateService = stateService;
+ this.entityProfileCache = entityProfileCache;
}
@Override
@@ -227,6 +231,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) {
if (event.getEntityId().getEntityType() == EntityType.TENANT) {
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
+ entityProfileCache.removeTenant(event.getTenantId());
Set partitions = stateService.getPartitions();
if (CollectionUtils.isEmpty(partitions)) {
return;
@@ -235,6 +240,14 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
.filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId()))
.collect(Collectors.toSet()));
}
+ } else if (event.getEntityId().getEntityType() == EntityType.ASSET_PROFILE) {
+ if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
+ entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId());
+ }
+ } else if (event.getEntityId().getEntityType() == EntityType.DEVICE_PROFILE) {
+ if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
+ entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId());
+ }
}
}