removed myEntityIds map and handled tenant and asset/device profile deletion
This commit is contained in:
parent
2d95b7ad58
commit
b715ce2e43
@ -322,11 +322,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
EntityId entityId = cfCtx.getEntityId();
|
||||
EntityType entityType = cfCtx.getEntityId().getEntityType();
|
||||
if (isProfileEntity(entityType)) {
|
||||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
|
||||
var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId);
|
||||
if (!entityIds.isEmpty()) {
|
||||
//TODO: no need to do this if we cache all created actors and know which one belong to us;
|
||||
var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
|
||||
entityIds.forEach(id -> deleteCfForEntity(id, cfId, multiCallback));
|
||||
entityIds.forEach(id -> {
|
||||
if (isMyPartition(id, multiCallback)) {
|
||||
deleteCfForEntity(id, cfId, multiCallback);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
@ -366,10 +370,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
EntityId sourceEntityId = msg.getEntityId();
|
||||
log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId);
|
||||
var proto = msg.getProto();
|
||||
var callback = msg.getCallback();
|
||||
var linksList = proto.getLinksList();
|
||||
if (linksList.isEmpty()) {
|
||||
log.debug("[{}] No CF links to process new telemetry.", msg.getTenantId());
|
||||
msg.getCallback().onSuccess();
|
||||
callback.onSuccess();
|
||||
}
|
||||
for (var linkProto : linksList) {
|
||||
var link = fromProto(linkProto);
|
||||
@ -378,21 +383,25 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
var cf = calculatedFields.get(link.cfId());
|
||||
if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) {
|
||||
// iterate over all entities that belong to profile and push the message for corresponding CF
|
||||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, targetEntityId);
|
||||
var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, targetEntityId);
|
||||
if (!entityIds.isEmpty()) {
|
||||
MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
|
||||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
|
||||
MultipleTbCallback multipleCallback = new MultipleTbCallback(entityIds.size(), callback);
|
||||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, multipleCallback);
|
||||
entityIds.forEach(entityId -> {
|
||||
log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
|
||||
getOrCreateActor(entityId).tell(newMsg);
|
||||
if (isMyPartition(entityId, multipleCallback)) {
|
||||
log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
|
||||
getOrCreateActor(entityId).tell(newMsg);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
msg.getCallback().onSuccess();
|
||||
callback.onSuccess();
|
||||
}
|
||||
} else {
|
||||
log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
|
||||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback());
|
||||
getOrCreateActor(targetEntityId).tell(newMsg);
|
||||
if (isMyPartition(targetEntityId, callback)) {
|
||||
log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
|
||||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
|
||||
getOrCreateActor(targetEntityId).tell(newMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -436,10 +445,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
EntityId entityId = cfCtx.getEntityId();
|
||||
EntityType entityType = cfCtx.getEntityId().getEntityType();
|
||||
if (isProfileEntity(entityType)) {
|
||||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
|
||||
var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId);
|
||||
if (!entityIds.isEmpty()) {
|
||||
var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
|
||||
entityIds.forEach(id -> initCfForEntity(id, cfCtx, forceStateReinit, multiCallback));
|
||||
entityIds.forEach(id -> {
|
||||
if (isMyPartition(id, multiCallback)) {
|
||||
initCfForEntity(id, cfCtx, forceStateReinit, multiCallback);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -30,7 +30,11 @@ public interface CalculatedFieldEntityProfileCache extends ApplicationListener<P
|
||||
|
||||
void evict(TenantId tenantId, EntityId entityId);
|
||||
|
||||
Collection<EntityId> getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId);
|
||||
void evictProfile(TenantId tenantId, EntityId profileId);
|
||||
|
||||
void removeTenant(TenantId tenantId);
|
||||
|
||||
Collection<EntityId> getEntityIdsByProfileId(TenantId tenantId, EntityId profileId);
|
||||
|
||||
int getEntityIdPartition(TenantId tenantId, EntityId entityId);
|
||||
}
|
||||
|
||||
@ -22,17 +22,12 @@ import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -50,50 +45,23 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
|
||||
|
||||
@Override
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||
Map<TenantId, List<Integer>> tenantPartitions = new HashMap<>();
|
||||
List<Integer> systemPartitions = new ArrayList<>();
|
||||
|
||||
event.getCfPartitions().stream()
|
||||
.filter(TopicPartitionInfo::isMyPartition)
|
||||
.forEach(tpi -> {
|
||||
Integer partition = tpi.getPartition().orElse(UNKNOWN);
|
||||
Optional<TenantId> tenantIdOpt = tpi.getTenantId();
|
||||
if (tenantIdOpt.isPresent()) {
|
||||
tenantPartitions.computeIfAbsent(tenantIdOpt.get(), id -> new ArrayList<>()).add(partition);
|
||||
} else {
|
||||
systemPartitions.add(partition);
|
||||
}
|
||||
});
|
||||
|
||||
tenantPartitions.forEach((tenantId, partitions) -> {
|
||||
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
|
||||
cache.setMyPartitions(partitions);
|
||||
event.getCfPartitions().forEach(tpi -> {
|
||||
Optional<TenantId> tenantIdOpt = tpi.getTenantId();
|
||||
tenantIdOpt.ifPresent(tenantId -> tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()));
|
||||
});
|
||||
|
||||
tenantCache.keySet().stream()
|
||||
.filter(tenantId -> !tenantPartitions.containsKey(tenantId))
|
||||
.forEach(tenantId -> {
|
||||
var cache = tenantCache.get(tenantId);
|
||||
cache.setMyPartitions(systemPartitions);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
|
||||
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||
var partition = tpi.getPartition().orElse(UNKNOWN);
|
||||
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
|
||||
.add(profileId, entityId, partition, tpi.isMyPartition());
|
||||
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).add(profileId, entityId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
|
||||
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
|
||||
var partition = tpi.getPartition().orElse(UNKNOWN);
|
||||
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
|
||||
//TODO: make this method atomic;
|
||||
cache.remove(oldProfileId, entityId);
|
||||
cache.add(newProfileId, entityId, partition, tpi.isMyPartition());
|
||||
cache.add(newProfileId, entityId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -103,8 +71,18 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<EntityId> getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId) {
|
||||
return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getMyEntityIdsByProfileId(profileId);
|
||||
public void evictProfile(TenantId tenantId, EntityId profileId) {
|
||||
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).removeProfileId(profileId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeTenant(TenantId tenantId) {
|
||||
tenantCache.remove(tenantId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<EntityId> getEntityIdsByProfileId(TenantId tenantId, EntityId profileId) {
|
||||
return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getEntityIdsByProfileId(profileId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -32,31 +32,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
public class TenantEntityProfileCache {
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final Map<Integer, Map<EntityId, Set<EntityId>>> allEntities = new HashMap<>();
|
||||
private final Map<EntityId, Set<EntityId>> myEntities = new HashMap<>();
|
||||
|
||||
public void setMyPartitions(List<Integer> myPartitions) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
myEntities.clear();
|
||||
myPartitions.forEach(partitionId -> {
|
||||
var map = allEntities.get(partitionId);
|
||||
if (map != null) {
|
||||
map.forEach((profileId, entityIds) -> myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).addAll(entityIds));
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
private final Map<EntityId, Set<EntityId>> allEntities = new HashMap<>();
|
||||
|
||||
public void removeProfileId(EntityId profileId) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Remove from allEntities
|
||||
allEntities.values().forEach(map -> map.remove(profileId));
|
||||
// Remove from myEntities
|
||||
myEntities.remove(profileId);
|
||||
allEntities.remove(profileId);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
@ -66,9 +48,7 @@ public class TenantEntityProfileCache {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Remove from allEntities
|
||||
allEntities.values().forEach(map -> map.values().forEach(set -> set.remove(entityId)));
|
||||
// Remove from myEntities
|
||||
myEntities.values().forEach(set -> set.remove(entityId));
|
||||
allEntities.values().forEach(set -> set.remove(entityId));
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
@ -78,33 +58,28 @@ public class TenantEntityProfileCache {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Remove from allEntities
|
||||
allEntities.values().forEach(map -> removeSafely(map, profileId, entityId));
|
||||
// Remove from myEntities
|
||||
removeSafely(myEntities, profileId, entityId);
|
||||
removeSafely(allEntities, profileId, entityId);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void add(EntityId profileId, EntityId entityId, Integer partition, boolean mine) {
|
||||
public void add(EntityId profileId, EntityId entityId) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if(EntityType.DEVICE.equals(profileId.getEntityType())){
|
||||
throw new RuntimeException("WTF?");
|
||||
if (EntityType.DEVICE.equals(profileId.getEntityType()) || EntityType.ASSET.equals(profileId.getEntityType())) {
|
||||
throw new RuntimeException("Entity type '" + profileId.getEntityType() + "' is not a profileId.");
|
||||
}
|
||||
if (mine) {
|
||||
myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId);
|
||||
}
|
||||
allEntities.computeIfAbsent(partition, k -> new HashMap<>()).computeIfAbsent(profileId, p -> new HashSet<>()).add(entityId);
|
||||
allEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<EntityId> getMyEntityIdsByProfileId(EntityId profileId) {
|
||||
public Collection<EntityId> getEntityIdsByProfileId(EntityId profileId) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
var entities = myEntities.getOrDefault(profileId, Collections.emptySet());
|
||||
var entities = allEntities.getOrDefault(profileId, Collections.emptySet());
|
||||
List<EntityId> result = new ArrayList<>(entities.size());
|
||||
result.addAll(entities);
|
||||
return result;
|
||||
|
||||
@ -52,6 +52,7 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldCache;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
|
||||
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
|
||||
@ -80,6 +81,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
|
||||
|
||||
private final TbRuleEngineQueueFactory queueFactory;
|
||||
private final CalculatedFieldStateService stateService;
|
||||
private final CalculatedFieldEntityProfileCache entityProfileCache;
|
||||
|
||||
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
|
||||
ActorSystemContext actorContext,
|
||||
@ -91,11 +93,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
|
||||
ApplicationEventPublisher eventPublisher,
|
||||
JwtSettingsService jwtSettingsService,
|
||||
CalculatedFieldCache calculatedFieldCache,
|
||||
CalculatedFieldStateService stateService) {
|
||||
CalculatedFieldStateService stateService,
|
||||
CalculatedFieldEntityProfileCache entityProfileCache) {
|
||||
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
|
||||
eventPublisher, jwtSettingsService);
|
||||
this.queueFactory = tbQueueFactory;
|
||||
this.stateService = stateService;
|
||||
this.entityProfileCache = entityProfileCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -227,6 +231,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
|
||||
public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) {
|
||||
if (event.getEntityId().getEntityType() == EntityType.TENANT) {
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
entityProfileCache.removeTenant(event.getTenantId());
|
||||
Set<TopicPartitionInfo> partitions = stateService.getPartitions();
|
||||
if (CollectionUtils.isEmpty(partitions)) {
|
||||
return;
|
||||
@ -235,6 +240,14 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
|
||||
.filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId()))
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
} else if (event.getEntityId().getEntityType() == EntityType.ASSET_PROFILE) {
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId());
|
||||
}
|
||||
} else if (event.getEntityId().getEntityType() == EntityType.DEVICE_PROFILE) {
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user