updated ctx value in cache when expression failed after cf update

This commit is contained in:
IrynaMatveieva 2025-10-02 13:18:31 +03:00
parent 9b412d1dd8
commit 0a2b1168fa
3 changed files with 80 additions and 24 deletions

View File

@ -287,29 +287,29 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
newCfCtx.init();
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
}
calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
boolean found = false;
for (CalculatedFieldCtx oldCtx : oldCfList) {
if (oldCtx.getCfId().equals(newCf.getId())) {
newCfList.add(newCfCtx);
found = true;
} else {
newCfList.add(oldCtx);
} finally {
calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
boolean found = false;
for (CalculatedFieldCtx oldCtx : oldCfList) {
if (oldCtx.getCfId().equals(newCf.getId())) {
newCfList.add(newCfCtx);
found = true;
} else {
newCfList.add(oldCtx);
}
}
if (!found) {
newCfList.add(newCfCtx);
}
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
deleteLinks(oldCfCtx);
addLinks(newCf);
}
if (!found) {
newCfList.add(newCfCtx);
}
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
deleteLinks(oldCfCtx);
addLinks(newCf);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
initCf(newCfCtx, callback, stateChanges);
@ -550,11 +550,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
cfCtx.init();
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
} finally {
calculatedFields.put(cf.getId(), cfCtx);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
}
calculatedFields.put(cf.getId(), cfCtx);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
}
private void initCalculatedFieldLink(CalculatedFieldLink link) {

View File

@ -110,6 +110,7 @@ public class CalculatedFieldCtx {
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
initialized = true;
} catch (Exception e) {
initialized = false;
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
}
} else {
@ -123,6 +124,7 @@ public class CalculatedFieldCtx {
);
initialized = true;
} else {
initialized = false;
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
}
}

View File

@ -606,6 +606,59 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
});
}
@Test
public void testSimpleCalculatedFieldWhenCtxBecameUninitialized() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SIMPLE);
calculatedField.setName("M + 1");
calculatedField.setDebugSettings(DebugSettings.all());
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
Argument argument = new Argument();
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("m", ArgumentType.TS_LATEST, null);
argument.setRefEntityKey(refEntityKey);
config.setArguments(Map.of("m", argument));
config.setExpression("m + 1");
Output output = new Output();
output.setName("m1");
output.setType(OutputType.TIME_SERIES);
output.setDecimalsByDefault(0);
config.setOutput(output);
calculatedField.setConfiguration(config);
calculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"m\":1}"));
await().alias("create CF -> ctx is initialized -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode m1 = getLatestTelemetry(testDevice.getId(), "m1");
assertThat(m1).isNotNull();
assertThat(m1.get("m1").get(0).get("value").asText()).isEqualTo("2");
});
config.setExpression("m m");
calculatedField.setConfiguration(config);
calculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"m\":2}"));
await().alias("update CF -> ctx is not initialized -> no calculation performed").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode m1 = getLatestTelemetry(testDevice.getId(), "m1");
assertThat(m1).isNotNull();
assertThat(m1.get("m1").get(0).get("value").asText()).isEqualTo("2");
});
}
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
}