diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java index b439d8e641..2fe1f65015 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -90,7 +90,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> calculatedFieldLinks = new ConcurrentHashMap<>(); - private final ConcurrentMap states = new ConcurrentHashMap<>(); + private final ConcurrentMap states = new ConcurrentHashMap<>(); @Value("${state.initFetchPackSize:50000}") @Getter @@ -139,16 +139,16 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp calculatedFields.put(calculatedFieldId, cf); calculatedFieldLinks.put(calculatedFieldId, links); switch (entityId.getEntityType()) { - case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, callback); + case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, entityId, callback); case ASSET_PROFILE -> { PageDataIterable assetIds = new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); - assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, callback)); + assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback)); } case DEVICE_PROFILE -> { PageDataIterable deviceIds = new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); - deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, callback)); + deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback)); } default -> throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); @@ -180,7 +180,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); calculatedFieldLinks.remove(calculatedFieldId); calculatedFields.remove(calculatedFieldId); - states.remove(calculatedFieldId); + states.keySet().removeIf(ctxId -> ctxId.startsWith(calculatedFieldId.getId().toString())); } catch (Exception e) { log.trace("Failed to process calculated field delete msg: [{}]", proto, e); callback.onFailure(e); @@ -261,7 +261,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp }; } - private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, TbCallback callback) { + private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) { Map arguments = calculatedField.getConfiguration().getArguments(); Map argumentValues = new HashMap<>(); arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() { @@ -278,7 +278,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp } }, calculatedFieldCallbackExecutor)); - updateOrInitializeState(calculatedField, argumentValues); + updateOrInitializeState(calculatedField, entityId, argumentValues); } @@ -296,8 +296,9 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp }; } - private void updateOrInitializeState(CalculatedField calculatedField, Map argumentValues) { - CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(calculatedField.getId(), + private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map argumentValues) { + String ctxId = calculatedField.getId().getId() + "_" + entityId.getId(); + CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), null)); CalculatedFieldState state = calculatedFieldCtx.getState(); @@ -322,7 +323,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp .build(); } calculatedFieldCtx.setState(state); - states.put(calculatedField.getId(), calculatedFieldCtx); + states.put(ctxId, calculatedFieldCtx); } private String performCalculation(Map argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) {