Merge pull request #12715 from irynamatveieva/calculated-fields

Calculated fields improvements
This commit is contained in:
Andrew Shvayka 2025-02-26 16:00:10 +02:00 committed by GitHub
commit 845affb428
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 395 additions and 85 deletions

View File

@ -120,6 +120,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
}
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
@ -191,23 +194,23 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
}
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
@SneakyThrows
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
}
@SneakyThrows
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
}
@SneakyThrows
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) {
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
if (newArgValues.isEmpty()) {
log.info("[{}] No new argument values to process for CF.", ctx.getCfId());
callback.onSuccess(CALLBACKS_PER_CF);
@ -249,11 +252,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return state;
}
@SneakyThrows
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
boolean stateSizeOk;
if (ctx.isInitialized() && state.isReady()) {
try {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk();
@ -263,6 +266,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
}
}
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build();
}
} else {
state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk();

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
@ -39,7 +38,6 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
@ -53,11 +51,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
/**
* @author Andrew Shvayka
@ -300,6 +299,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.warn("[{}] CF was already deleted [{}]", tenantId, cfId);
callback.onSuccess();
} else {
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);
deleteLinks(cfCtx);
EntityId entityId = cfCtx.getEntityId();
@ -349,7 +349,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var proto = msg.getProto();
var linksList = proto.getLinksList();
for (var linkProto : linksList) {
var link = toCalculatedFieldEntityCtxId(linkProto);
var link = fromProto(linkProto);
var targetEntityId = link.entityId();
var targetEntityType = targetEntityId.getEntityType();
var cf = calculatedFields.get(link.cfId());
@ -374,12 +374,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private CalculatedFieldEntityCtxId toCalculatedFieldEntityCtxId(CalculatedFieldEntityCtxIdProto ctxIdProto) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId);
}
private List<CalculatedFieldEntityCtxId> filterCalculatedFieldLinks(CalculatedFieldTelemetryMsg msg) {
EntityId entityId = msg.getEntityId();
var proto = msg.getProto();

View File

@ -25,7 +25,6 @@ import org.thingsboard.script.api.tbel.TbelCfTsDoubleVal;
import org.thingsboard.script.api.tbel.TbelCfTsRollingArg;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.exception.CalculatedFieldStateException;
import java.util.ArrayList;
import java.util.List;
@ -86,7 +85,7 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
}
@Override
public boolean updateEntry(ArgumentEntry entry) throws CalculatedFieldStateException {
public boolean updateEntry(ArgumentEntry entry) {
if (entry instanceof TsRollingArgumentEntry tsRollingEntry) {
updateTsRollingEntry(tsRollingEntry);
} else if (entry instanceof SingleValueArgumentEntry singleValueEntry) {
@ -116,10 +115,11 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
case STRING -> value.getStrValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
case JSON -> value.getJsonValue().ifPresent(aString -> tsRecords.put(ts, Double.parseDouble(aString)));
}
cleanupExpiredRecords();
} catch (Exception e) {
log.warn("Time series rolling arguments supports only numeric values.");
// throw new IllegalArgumentException("Time series rolling arguments supports only numeric values.");
tsRecords.put(ts, Double.NaN);
log.debug("Invalid value '{}' for time series rolling arguments. Only numeric values are supported.", value.getValue());
} finally {
cleanupExpiredRecords();
}
}

View File

@ -697,7 +697,6 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onAssetUpdated(Asset entity, Asset old) {
var created = old == null;
broadcastEntityChangeToTransport(entity.getTenantId(), entity.getId(), entity, null);
if (old != null) {
boolean assetTypeChanged = !entity.getAssetProfileId().equals(old.getAssetProfileId());
if (assetTypeChanged) {

View File

@ -27,6 +27,7 @@ import org.thingsboard.server.actors.TbActorSystemSettings;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.RuleChainErrorActor;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.id.DeviceId;
@ -116,6 +117,7 @@ public class TenantActorTest {
TbActorSystemSettings settings = new TbActorSystemSettings(0, 0, 0);
TbActorSystem system = spy(new DefaultTbActorSystem(settings));
system.createDispatcher(RULE_DISPATCHER_NAME, mock());
system.createDispatcher(DefaultActorService.CF_MANAGER_DISPATCHER_NAME, mock());
TbActorMailbox tenantCtx = new TbActorMailbox(system, settings, null, mock(), mock(), null);
tenantActor.init(tenantCtx);

View File

@ -83,57 +83,52 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
calculatedField.setConfiguration(config);
calculatedField.setVersion(1L);
// create CF -> perform initial calculation
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("77.0");
});
// update telemetry -> recalculate state
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
// update CF output -> perform calculation with updated output
Output savedOutput = savedCalculatedField.getConfiguration().getOutput();
savedOutput.setType(OutputType.ATTRIBUTES);
savedOutput.setScope(AttributeScope.SERVER_SCOPE);
savedOutput.setName("temperatureF");
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("update CF output -> perform calculation with updated output").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("86.0");
});
// update CF argument -> perform calculation with new argument
Argument savedArgument = savedCalculatedField.getConfiguration().getArguments().get("T");
savedArgument.setRefEntityKey(new ReferencedEntityKey("deviceTemperature", ArgumentType.ATTRIBUTE, AttributeScope.SERVER_SCOPE));
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("update CF argument -> perform calculation with new argument").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
assertThat(temperatureF.get(0).get("value").asText()).isEqualTo("104.0");
});
// update CF expression -> perform calculation with new expression
savedCalculatedField.getConfiguration().setExpression("1.8 * T + 32");
savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("update CF expression -> perform calculation with new expression").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
assertThat(temperatureF).isNotNull();
@ -168,20 +163,18 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
calculatedField.setConfiguration(config);
calculatedField.setVersion(1L);
// create CF -> state is not ready -> no calculation performed
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("create CF -> state is not ready -> no calculation performed").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
});
// update telemetry -> perform calculation
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update telemetry -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
@ -217,20 +210,18 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
calculatedField.setConfiguration(config);
calculatedField.setVersion(1L);
// create CF -> perform initial calculation with default value
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("create CF -> perform initial calculation with default value").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("53.6");
});
// update telemetry -> recalculate state
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
@ -283,10 +274,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
calculatedField.setConfiguration(config);
calculatedField.setVersion(1L);
// create CF and perform initial calculation
doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("create CF and perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -299,10 +289,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
assertThat(z2.get(0).get("value").asText()).isEqualTo("52.0");
});
// update device telemetry -> recalculate state for all assets
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":25}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -315,10 +304,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
});
// update asset 1 telemetry -> recalculate state only for asset 1
doPost("/api/plugins/telemetry/ASSET/" + asset1.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":15}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update asset 1 telemetry -> recalculate state only for asset 1").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -331,10 +319,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
assertThat(z2.get(0).get("value").asText()).isEqualTo("37.0");
});
// update asset 2 telemetry -> recalculate state only for asset 2
doPost("/api/plugins/telemetry/ASSET/" + asset2.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":5}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update asset 2 telemetry -> recalculate state only for asset 2").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1 (no changes)
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -347,12 +334,11 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
assertThat(z2.get(0).get("value").asText()).isEqualTo("30.0");
});
// add new entity to profile -> calculate state for new entity
Asset asset3 = createAsset("Test asset 3", assetProfile.getId());
doPost("/api/plugins/telemetry/ASSET/" + asset3.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":13}"));
Asset finalAsset3 = asset3;
await().atMost(5, TimeUnit.SECONDS)
await().alias("add new entity to profile -> calculate state for new entity").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 3
ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z");
@ -360,10 +346,9 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
assertThat(z3.get(0).get("value").asText()).isEqualTo("38.0");
});
// update device telemetry -> recalculate state for all assets
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":20}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -386,11 +371,10 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
asset3.setAssetProfileId(newAssetProfile.getId());
asset3 = doPost("/api/asset", asset3, Asset.class);
// update device telemetry -> recalculate state for asset 1 and asset 2
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":15}"));
Asset updatedAsset3 = asset3;
await().atMost(5, TimeUnit.SECONDS)
await().alias("update device telemetry -> recalculate state for asset 1 and asset 2").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
// result of asset 1
ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
@ -438,20 +422,18 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
calculatedField.setConfiguration(config);
calculatedField.setVersion(1L);
// create CF -> ctx is not initialized -> no calculation perform
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().atMost(5, TimeUnit.SECONDS)
await().alias("create CF -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").isNull()).isTrue();
});
// update telemetry -> ctx is not initialized -> no calculation perform
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
await().atMost(5, TimeUnit.SECONDS)
await().alias("update telemetry -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();

View File

@ -86,13 +86,14 @@ public class CalculatedFieldControllerTest extends AbstractControllerTest {
assertThat(savedCalculatedField.getType()).isEqualTo(calculatedField.getType());
assertThat(savedCalculatedField.getName()).isEqualTo(calculatedField.getName());
assertThat(savedCalculatedField.getConfiguration()).isEqualTo(getCalculatedFieldConfig(testDevice.getId()));
assertThat(savedCalculatedField.getVersion()).isEqualTo(calculatedField.getVersion());
assertThat(savedCalculatedField.getVersion()).isEqualTo(1L);
savedCalculatedField.setName("Test CF");
CalculatedField updatedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
assertThat(updatedCalculatedField).isEqualTo(savedCalculatedField);
assertThat(updatedCalculatedField.getName()).isEqualTo(savedCalculatedField.getName());
assertThat(updatedCalculatedField.getVersion()).isEqualTo(savedCalculatedField.getVersion() + 1);
doDelete("/api/calculatedField/" + savedCalculatedField.getId().getId().toString())
.andExpect(status().isOk());

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.queue.TbQueueCallback;
import java.util.ArrayList;
import java.util.Collections;
@ -44,6 +45,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -354,7 +356,7 @@ public class TenantProfileControllerTest extends AbstractControllerTest {
argument -> argument.getClass().equals(TenantProfile.class);
if (ComponentLifecycleEvent.DELETED.equals(event)) {
Mockito.verify(tbClusterService, times(cntTime)).onTenantProfileDelete(Mockito.argThat(matcherTenantProfile),
Mockito.isNull());
eq(TbQueueCallback.EMPTY));
testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant()));
} else {
Mockito.verify(tbClusterService, times(cntTime)).onTenantProfileChange(Mockito.argThat(matcherTenantProfile),

View File

@ -151,7 +151,7 @@ public class ScriptCalculatedFieldStateTest {
}
private TsRollingArgumentEntry createRollingArgEntry() {
TsRollingArgumentEntry argumentEntry = new TsRollingArgumentEntry();
TsRollingArgumentEntry argumentEntry = new TsRollingArgumentEntry(5, 30000L);
long ts = System.currentTimeMillis();
TreeMap<Long, Double> values = new TreeMap<>();

View File

@ -79,9 +79,8 @@ public class TsRollingArgumentEntryTest {
void testUpdateEntryWhenValueIsNotNumber() {
SingleValueArgumentEntry newEntry = new SingleValueArgumentEntry(ts - 10, new StringDataEntry("key", "string"), 123L);
assertThatThrownBy(() -> entry.updateEntry(newEntry))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Time series rolling arguments supports only numeric values.");
assertThat(entry.updateEntry(newEntry)).isTrue();
assertThat(entry.getTsRecords().get(ts - 10)).isNaN();
}
@Test

View File

@ -44,7 +44,7 @@ public enum LimitedApi {
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false),
EMAILS("emails sending", true),
WS_SUBSCRIPTIONS("WS subscriptions", false),
CALCULATED_FIELD_DEBUG_EVENTS(DefaultTenantProfileConfiguration::getCalculatedFieldDebugEventsRateLimit, "calculated field debug events", true);
CALCULATED_FIELD_DEBUG_EVENTS("calculated field debug events", true);
private Function<DefaultTenantProfileConfiguration, String> configExtractor;
@Getter

View File

@ -135,12 +135,11 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private double warnThreshold;
private long maxCalculatedFields;
private long maxCalculatedFieldsPerEntity;
private long maxArgumentsPerCF;
private long maxDataPointsPerRollingArg;
private long maxStateSizeInKBytes;
private long maxSingleValueArgumentSizeInKBytes;
private String calculatedFieldDebugEventsRateLimit;
@Override
public long getProfileThreshold(ApiUsageRecordKey key) {
@ -182,7 +181,6 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
case DASHBOARD -> maxDashboards;
case RULE_CHAIN -> maxRuleChains;
case EDGE -> maxEdges;
case CALCULATED_FIELD -> maxCalculatedFields;
default -> 0;
};
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.script.api.tbel;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@ -29,6 +30,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
})
public interface TbelCfArg extends TbelCfObject {
@JsonIgnore
String getType();
}

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -60,9 +61,20 @@ public class TbelCfTsRollingArg implements TbelCfArg, Iterable<TbelCfTsDoubleVal
}
public double max() {
return max(true);
}
public double max(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double max = Double.MIN_VALUE;
for (TbelCfTsDoubleVal value : values) {
double val = value.getValue();
if (!ignoreNaN && Double.isNaN(val)) {
return val;
}
if (max < val) {
max = val;
}
@ -70,6 +82,180 @@ public class TbelCfTsRollingArg implements TbelCfArg, Iterable<TbelCfTsDoubleVal
return max;
}
public double min() {
return min(true);
}
public double min(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double min = Double.MAX_VALUE;
for (TbelCfTsDoubleVal value : values) {
double val = value.getValue();
if (!ignoreNaN && Double.isNaN(val)) {
return Double.NaN;
}
if (min > val) {
min = val;
}
}
return min;
}
public double mean() {
return mean(true);
}
public double mean(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
return sum(ignoreNaN) / count(ignoreNaN);
}
public double std() {
return std(true);
}
public double std(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double mean = mean(ignoreNaN);
if (!ignoreNaN && Double.isNaN(mean)) {
return Double.NaN;
}
double sum = 0;
for (TbelCfTsDoubleVal value : values) {
double val = value.getValue();
if (Double.isNaN(val)) {
if (!ignoreNaN) {
return Double.NaN;
}
} else {
sum += Math.pow(val - mean, 2);
}
}
return Math.sqrt(sum / count(ignoreNaN));
}
public double median() {
return median(true);
}
public double median(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
List<Double> sortedValues = new ArrayList<>();
for (TbelCfTsDoubleVal value : values) {
double val = value.getValue();
if (Double.isNaN(val)) {
if (!ignoreNaN) {
return Double.NaN;
}
} else {
sortedValues.add(val);
}
}
Collections.sort(sortedValues);
int size = sortedValues.size();
return (size % 2 == 1)
? sortedValues.get(size / 2)
: (sortedValues.get(size / 2 - 1) + sortedValues.get(size / 2)) / 2.0;
}
public int count() {
return count(true);
}
public int count(boolean ignoreNaN) {
int count = 0;
if (ignoreNaN) {
for (TbelCfTsDoubleVal value : values) {
if (!Double.isNaN(value.getValue())) {
count++;
}
}
return count;
}
return values.size();
}
public double last() {
return last(true);
}
public double last(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double value = values.get(values.size() - 1).getValue();
if (!Double.isNaN(value) || !ignoreNaN) {
return value;
}
for (int i = values.size() - 2; i >= 0; i--) {
double prevValue = values.get(i).getValue();
if (!Double.isNaN(prevValue)) {
return prevValue;
}
}
throw new IllegalArgumentException("Rolling argument values are empty.");
}
public double first() {
return first(true);
}
public double first(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double firstValue = values.get(0).getValue();
if (!Double.isNaN(firstValue) || !ignoreNaN) {
return firstValue;
}
for (int i = 1; i < values.size(); i++) {
double nextValue = values.get(i).getValue();
if (!Double.isNaN(nextValue)) {
return nextValue;
}
}
throw new IllegalArgumentException("Rolling argument values are empty.");
}
public double sum() {
return sum(true);
}
public double sum(boolean ignoreNaN) {
if (values.isEmpty()) {
throw new IllegalArgumentException("Rolling argument values are empty.");
}
double sum = 0;
for (TbelCfTsDoubleVal value : values) {
double val = value.getValue();
if (Double.isNaN(val)) {
if (!ignoreNaN) {
return Double.NaN;
}
} else {
sum += val;
}
}
return sum;
}
@JsonIgnore
public int getSize() {
return values.size();

View File

@ -0,0 +1,131 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.script.api.tbel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.within;
public class TbelCfTsRollingArgTest {
private final long ts = System.currentTimeMillis();
private TbelCfTsRollingArg rollingArg;
@BeforeEach
void setUp() {
rollingArg = new TbelCfTsRollingArg(
new TbTimeWindow(ts - 30000, ts - 10, 10),
List.of(
new TbelCfTsDoubleVal(ts - 10, Double.NaN),
new TbelCfTsDoubleVal(ts - 20, 2.0),
new TbelCfTsDoubleVal(ts - 30, 8.0),
new TbelCfTsDoubleVal(ts - 40, Double.NaN),
new TbelCfTsDoubleVal(ts - 50, 3.0),
new TbelCfTsDoubleVal(ts - 60, 9.0),
new TbelCfTsDoubleVal(ts - 70, Double.NaN)
)
);
}
@Test
void testMax() {
assertThat(rollingArg.max()).isEqualTo(9.0);
assertThat(rollingArg.max(false)).isNaN();
}
@Test
void testMin() {
assertThat(rollingArg.min()).isEqualTo(2.0);
assertThat(rollingArg.min(false)).isNaN();
}
@Test
void testMean() {
assertThat(rollingArg.mean()).isEqualTo(5.5);
assertThat(rollingArg.mean(false)).isNaN();
}
@Test
void testStd() {
assertThat(rollingArg.std()).isCloseTo(3.0413812651491097, within(0.001));
assertThat(rollingArg.std(false)).isNaN();
}
@Test
void testMedian() {
assertThat(rollingArg.median()).isEqualTo(5.5);
assertThat(rollingArg.median(false)).isNaN();
}
@Test
void testCount() {
assertThat(rollingArg.count()).isEqualTo(4);
assertThat(rollingArg.count(false)).isEqualTo(7);
}
@Test
void testLast() {
assertThat(rollingArg.last()).isEqualTo(9.0);
assertThat(rollingArg.last(false)).isNaN();
}
@Test
void testFirst() {
assertThat(rollingArg.first()).isEqualTo(2.0);
assertThat(rollingArg.first(false)).isNaN();
}
@Test
void testFirstAndLastWhenOnlyNaNAndIgnoreNaNIsFalse() {
assertThat(rollingArg.first()).isEqualTo(2.0);
rollingArg = new TbelCfTsRollingArg(
new TbTimeWindow(ts - 30000, ts - 10, 10),
List.of(
new TbelCfTsDoubleVal(ts - 10, Double.NaN),
new TbelCfTsDoubleVal(ts - 40, Double.NaN),
new TbelCfTsDoubleVal(ts - 70, Double.NaN)
)
);
assertThatThrownBy(rollingArg::first).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::last).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
}
@Test
void testSum() {
assertThat(rollingArg.sum()).isEqualTo(22.0);
assertThat(rollingArg.sum(false)).isNaN();
}
@Test
void testEmptyValues() {
rollingArg = new TbelCfTsRollingArg(new TbTimeWindow(0, 10, 10), List.of());
assertThatThrownBy(rollingArg::sum).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::max).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::min).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::mean).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::std).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::median).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::first).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
assertThatThrownBy(rollingArg::last).isInstanceOf(IllegalArgumentException.class).hasMessage("Rolling argument values are empty.");
}
}

View File

@ -17,8 +17,8 @@ package org.thingsboard.server.dao.service.validator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.dao.cf.CalculatedFieldDao;
@ -37,7 +37,7 @@ public class CalculatedFieldDataValidator extends DataValidator<CalculatedField>
@Override
protected void validateCreate(TenantId tenantId, CalculatedField calculatedField) {
validateNumberOfEntitiesPerTenant(tenantId, EntityType.CALCULATED_FIELD);
validateNumberOfCFsPerEntity(tenantId, calculatedField.getEntityId());
validateNumberOfArgumentsPerCF(tenantId, calculatedField);
}
@ -51,6 +51,16 @@ public class CalculatedFieldDataValidator extends DataValidator<CalculatedField>
return old;
}
private void validateNumberOfCFsPerEntity(TenantId tenantId, EntityId entityId) {
long maxCFsPerEntity = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxCalculatedFieldsPerEntity);
if (maxCFsPerEntity <= 0) {
return;
}
if (calculatedFieldDao.countCFByEntityId(tenantId, entityId) >= maxCFsPerEntity) {
throw new DataValidationException("Calculated fields per entity limit reached!");
}
}
private void validateNumberOfArgumentsPerCF(TenantId tenantId, CalculatedField calculatedField) {
long maxArgumentsPerCF = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxArgumentsPerCF);
if (maxArgumentsPerCF <= 0) {

View File

@ -78,14 +78,14 @@ public class CalculatedFieldServiceTest extends AbstractServiceTest {
assertThat(savedCalculatedField.getType()).isEqualTo(calculatedField.getType());
assertThat(savedCalculatedField.getName()).isEqualTo(calculatedField.getName());
assertThat(savedCalculatedField.getConfiguration()).isEqualTo(calculatedField.getConfiguration());
assertThat(savedCalculatedField.getVersion()).isEqualTo(calculatedField.getVersion());
assertThat(savedCalculatedField.getVersion()).isEqualTo(1L);
savedCalculatedField.setName("Test CF");
CalculatedField updatedCalculatedField = calculatedFieldService.save(savedCalculatedField);
savedCalculatedField.setVersion(savedCalculatedField.getVersion() + 1);
assertThat(updatedCalculatedField).isEqualTo(savedCalculatedField);
assertThat(updatedCalculatedField.getName()).isEqualTo(savedCalculatedField.getName());
assertThat(updatedCalculatedField.getVersion()).isEqualTo(savedCalculatedField.getVersion() + 1);
calculatedFieldService.deleteCalculatedField(tenantId, savedCalculatedField.getId());
}
@ -148,7 +148,6 @@ public class CalculatedFieldServiceTest extends AbstractServiceTest {
calculatedField.setName("Test Calculated Field");
calculatedField.setConfigurationVersion(1);
calculatedField.setConfiguration(getCalculatedFieldConfig(referencedEntityId));
calculatedField.setVersion(1L);
return calculatedField;
}

View File

@ -25,7 +25,6 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.cf.CalculatedFieldDao;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.dao.usagerecord.DefaultApiLimitService;
import java.util.UUID;