changed key for states map

This commit is contained in:
IrynaMatveieva 2024-11-13 09:06:12 +02:00
parent 23009c9aaf
commit 88e5da7a14

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2024 The Thingsboard Authors * Copyright © 2016-2024 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -90,7 +90,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>(); private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>();
private final ConcurrentMap<CalculatedFieldId, CalculatedFieldCtx> states = new ConcurrentHashMap<>(); private final ConcurrentMap<String, CalculatedFieldCtx> states = new ConcurrentHashMap<>();
@Value("${state.initFetchPackSize:50000}") @Value("${state.initFetchPackSize:50000}")
@Getter @Getter
@ -139,16 +139,16 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
calculatedFields.put(calculatedFieldId, cf); calculatedFields.put(calculatedFieldId, cf);
calculatedFieldLinks.put(calculatedFieldId, links); calculatedFieldLinks.put(calculatedFieldId, links);
switch (entityId.getEntityType()) { switch (entityId.getEntityType()) {
case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, callback); case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, entityId, callback);
case ASSET_PROFILE -> { case ASSET_PROFILE -> {
PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink -> PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink ->
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize);
assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, callback)); assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback));
} }
case DEVICE_PROFILE -> { case DEVICE_PROFILE -> {
PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink -> PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink ->
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize);
deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, callback)); deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback));
} }
default -> default ->
throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields.");
@ -180,7 +180,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB()));
calculatedFieldLinks.remove(calculatedFieldId); calculatedFieldLinks.remove(calculatedFieldId);
calculatedFields.remove(calculatedFieldId); calculatedFields.remove(calculatedFieldId);
states.remove(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.startsWith(calculatedFieldId.getId().toString()));
} catch (Exception e) { } catch (Exception e) {
log.trace("Failed to process calculated field delete msg: [{}]", proto, e); log.trace("Failed to process calculated field delete msg: [{}]", proto, e);
callback.onFailure(e); callback.onFailure(e);
@ -261,7 +261,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
}; };
} }
private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, TbCallback callback) { private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) {
Map<String, BaseCalculatedFieldConfiguration.Argument> arguments = calculatedField.getConfiguration().getArguments(); Map<String, BaseCalculatedFieldConfiguration.Argument> arguments = calculatedField.getConfiguration().getArguments();
Map<String, String> argumentValues = new HashMap<>(); Map<String, String> argumentValues = new HashMap<>();
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() { arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() {
@ -278,7 +278,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
} }
}, calculatedFieldCallbackExecutor)); }, calculatedFieldCallbackExecutor));
updateOrInitializeState(calculatedField, argumentValues); updateOrInitializeState(calculatedField, entityId, argumentValues);
} }
@ -296,8 +296,9 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
}; };
} }
private void updateOrInitializeState(CalculatedField calculatedField, Map<String, String> argumentValues) { private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, String> argumentValues) {
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(calculatedField.getId(), String ctxId = calculatedField.getId().getId() + "_" + entityId.getId();
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId,
ctx -> new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), null)); ctx -> new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), null));
CalculatedFieldState state = calculatedFieldCtx.getState(); CalculatedFieldState state = calculatedFieldCtx.getState();
@ -322,7 +323,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
.build(); .build();
} }
calculatedFieldCtx.setState(state); calculatedFieldCtx.setState(state);
states.put(calculatedField.getId(), calculatedFieldCtx); states.put(ctxId, calculatedFieldCtx);
} }
private String performCalculation(Map<String, String> argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) { private String performCalculation(Map<String, String> argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) {