Merge pull request #14099 from irynamatveieva/fix/cf-ctx
Updated ctx in cache if expression is invalid after CF update
This commit is contained in:
commit
0a94c03a4b
@ -287,29 +287,29 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
newCfCtx.init();
|
newCfCtx.init();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
|
throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
|
||||||
}
|
} finally {
|
||||||
calculatedFields.put(newCf.getId(), newCfCtx);
|
calculatedFields.put(newCf.getId(), newCfCtx);
|
||||||
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
|
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
|
||||||
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
|
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (CalculatedFieldCtx oldCtx : oldCfList) {
|
for (CalculatedFieldCtx oldCtx : oldCfList) {
|
||||||
if (oldCtx.getCfId().equals(newCf.getId())) {
|
if (oldCtx.getCfId().equals(newCf.getId())) {
|
||||||
newCfList.add(newCfCtx);
|
newCfList.add(newCfCtx);
|
||||||
found = true;
|
found = true;
|
||||||
} else {
|
} else {
|
||||||
newCfList.add(oldCtx);
|
newCfList.add(oldCtx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (!found) {
|
||||||
|
newCfList.add(newCfCtx);
|
||||||
|
}
|
||||||
|
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
||||||
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
||||||
|
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
|
||||||
|
deleteLinks(oldCfCtx);
|
||||||
|
addLinks(newCf);
|
||||||
}
|
}
|
||||||
if (!found) {
|
|
||||||
newCfList.add(newCfCtx);
|
|
||||||
}
|
|
||||||
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
|
|
||||||
|
|
||||||
deleteLinks(oldCfCtx);
|
|
||||||
addLinks(newCf);
|
|
||||||
|
|
||||||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|
||||||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
||||||
var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
|
var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
|
||||||
if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
|
if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
|
||||||
initCf(newCfCtx, callback, stateChanges);
|
initCf(newCfCtx, callback, stateChanges);
|
||||||
@ -550,11 +550,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
cfCtx.init();
|
cfCtx.init();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
|
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
|
||||||
|
} finally {
|
||||||
|
calculatedFields.put(cf.getId(), cfCtx);
|
||||||
|
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
||||||
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
||||||
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
|
||||||
}
|
}
|
||||||
calculatedFields.put(cf.getId(), cfCtx);
|
|
||||||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|
||||||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
||||||
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initCalculatedFieldLink(CalculatedFieldLink link) {
|
private void initCalculatedFieldLink(CalculatedFieldLink link) {
|
||||||
|
|||||||
@ -110,6 +110,7 @@ public class CalculatedFieldCtx {
|
|||||||
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
|
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
|
||||||
initialized = true;
|
initialized = true;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
initialized = false;
|
||||||
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
|
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -123,6 +124,7 @@ public class CalculatedFieldCtx {
|
|||||||
);
|
);
|
||||||
initialized = true;
|
initialized = true;
|
||||||
} else {
|
} else {
|
||||||
|
initialized = false;
|
||||||
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
|
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -606,6 +606,59 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleCalculatedFieldWhenCtxBecameUninitialized() throws Exception {
|
||||||
|
Device testDevice = createDevice("Test device", "1234567890");
|
||||||
|
|
||||||
|
CalculatedField calculatedField = new CalculatedField();
|
||||||
|
calculatedField.setEntityId(testDevice.getId());
|
||||||
|
calculatedField.setType(CalculatedFieldType.SIMPLE);
|
||||||
|
calculatedField.setName("M + 1");
|
||||||
|
calculatedField.setDebugSettings(DebugSettings.all());
|
||||||
|
|
||||||
|
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
|
||||||
|
|
||||||
|
Argument argument = new Argument();
|
||||||
|
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("m", ArgumentType.TS_LATEST, null);
|
||||||
|
argument.setRefEntityKey(refEntityKey);
|
||||||
|
config.setArguments(Map.of("m", argument));
|
||||||
|
config.setExpression("m + 1");
|
||||||
|
|
||||||
|
Output output = new Output();
|
||||||
|
output.setName("m1");
|
||||||
|
output.setType(OutputType.TIME_SERIES);
|
||||||
|
output.setDecimalsByDefault(0);
|
||||||
|
config.setOutput(output);
|
||||||
|
|
||||||
|
calculatedField.setConfiguration(config);
|
||||||
|
|
||||||
|
calculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
|
||||||
|
|
||||||
|
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"m\":1}"));
|
||||||
|
|
||||||
|
await().alias("create CF -> ctx is initialized -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
|
||||||
|
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
|
||||||
|
.untilAsserted(() -> {
|
||||||
|
ObjectNode m1 = getLatestTelemetry(testDevice.getId(), "m1");
|
||||||
|
assertThat(m1).isNotNull();
|
||||||
|
assertThat(m1.get("m1").get(0).get("value").asText()).isEqualTo("2");
|
||||||
|
});
|
||||||
|
|
||||||
|
config.setExpression("m m");
|
||||||
|
calculatedField.setConfiguration(config);
|
||||||
|
calculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
|
||||||
|
|
||||||
|
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"m\":2}"));
|
||||||
|
|
||||||
|
await().alias("update CF -> ctx is not initialized -> no calculation performed").atMost(TIMEOUT, TimeUnit.SECONDS)
|
||||||
|
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
|
||||||
|
.untilAsserted(() -> {
|
||||||
|
ObjectNode m1 = getLatestTelemetry(testDevice.getId(), "m1");
|
||||||
|
assertThat(m1).isNotNull();
|
||||||
|
assertThat(m1.get("m1").get(0).get("value").asText()).isEqualTo("2");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
|
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
|
||||||
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
|
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user