diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index c62898ab09..ffea217da7 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -42,8 +42,6 @@ public interface TimeseriesService { ListenableFuture> findLatest(TenantId tenantId, EntityId entityId, Collection keys); - List findLatestSync(TenantId tenantId, EntityId entityId, Collection keys); - ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index a9fb85560a..425bb10a0e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -19,6 +19,8 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -50,8 +52,6 @@ import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -163,11 +163,6 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return service.submit(() -> getLatestTsKvEntry(entityId, key)); } - @Override - public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { - return getLatestTsKvEntry(entityId, key); - } - @Override public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { return getFindAllLatestFuture(entityId); @@ -209,7 +204,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme ReadTsKvQueryResult::getData, MoreExecutors.directExecutor()); } - protected TsKvEntry doFindLatest(EntityId entityId, String key) { + protected TsKvEntry doFindLatest(EntityId entityId, String key) { TsKvLatestCompositeKey compositeKey = new TsKvLatestCompositeKey( entityId.getId(), diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index cfba6786f3..60056b2b8f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -134,17 +134,6 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.allAsList(futures); } - @Override - public List findLatestSync(TenantId tenantId, EntityId entityId, Collection keys) { - validate(entityId); - List latestEntries = new ArrayList<>(keys.size()); - keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); - for (String key : keys) { - latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key)); - } - return latestEntries; - } - @Override public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { validate(entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 342d4d9771..7a5904eb6b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -44,7 +44,6 @@ import org.thingsboard.server.dao.util.NoSqlTsLatestDao; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; @@ -70,16 +69,6 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one())); } - @Override - public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { - try { - return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one())).get(); - } catch (InterruptedException | ExecutionException e) { - log.error("[{}][{}] Failed to get latest entry for key: {} due to: ", tenantId, entityId, key, e); - throw new RuntimeException(e); - } - } - private ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key, java.util.function.Function function) { BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getFindLatestStmt().bind()); stmtBuilder.setString(0, entityId.getEntityType().name()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index aad4074f85..d339f49e11 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -40,8 +40,6 @@ public interface TimeseriesLatestDao { */ ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key); - TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key); - ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index 823c224833..4e64ad854e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -105,8 +105,8 @@ public class TbMathNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new); - semaphoreWithQueue.addToQueueAndTryProcess(msg, ctx, this::processMsgAsync); + locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new) + .addToQueueAndTryProcess(msg, ctx, this::processMsgAsync); } ListenableFuture processMsgAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java index f459a532ed..1552558a9e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java @@ -30,20 +30,18 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.util.SemaphoreWithTbMsgQueue; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Map; -import static org.thingsboard.common.util.DonAsynchron.withCallback; - @Slf4j @RuleNode(type = ComponentType.ENRICHMENT, name = "calculate delta", @@ -61,20 +59,21 @@ public class CalculateDeltaNode implements TbNode { private Map locks; private CalculateDeltaNodeConfiguration config; - private TbContext ctx; - private TimeseriesService timeseriesService; - private boolean useCache; - private String inputKey; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, CalculateDeltaNodeConfiguration.class); - this.ctx = ctx; - this.timeseriesService = ctx.getTimeseriesService(); - this.inputKey = config.getInputValueKey(); - this.useCache = config.isUseCache(); - if (useCache) { - locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + if (StringUtils.isBlank(config.getInputValueKey())) { + throw new TbNodeException("Input value key should be specified!", true); + } + if (StringUtils.isBlank(config.getOutputValueKey())) { + throw new TbNodeException("Output value key should be specified!", true); + } + if (config.isAddPeriodBetweenMsgs() && StringUtils.isBlank(config.getPeriodValueKey())) { + throw new TbNodeException("Period value key should be specified!", true); + } + locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + if (config.isUseCache()) { cache = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT); } } @@ -85,35 +84,26 @@ public class CalculateDeltaNode implements TbNode { ctx.tellNext(msg, TbNodeConnectionType.OTHER); return; } - JsonNode json = JacksonUtil.toJsonNode(msg.getData()); - if (!json.has(inputKey)) { + JsonNode msgData = JacksonUtil.toJsonNode(msg.getData()); + if (msgData == null || !msgData.has(config.getInputValueKey())) { ctx.tellNext(msg, TbNodeConnectionType.OTHER); return; } - if (useCache) { - var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new); - semaphoreWithQueue.addToQueueAndTryProcess(msg, ctx, this::processMsgAsync); - return; - } - withCallback(fetchLatestValueAsync(msg.getOriginator()), - previousData -> { - processCalculateDelta(msg.getOriginator(), msg.getMetaDataTs(), (ObjectNode) json, previousData); - ctx.tellSuccess(TbMsg.transformMsgData(msg, JacksonUtil.toString(json))); - }, - t -> ctx.tellFailure(msg, t), MoreExecutors.directExecutor()); + locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new) + .addToQueueAndTryProcess(msg, ctx, this::processMsgAsync); } @Override public void destroy() { - if (useCache) { + locks.clear(); + if (config.isUseCache()) { cache.clear(); - locks.clear(); } } - private ListenableFuture fetchLatestValueAsync(EntityId entityId) { - return Futures.transform(timeseriesService.findLatest(ctx.getTenantId(), entityId, config.getInputValueKey()), - tsKvEntryOpt -> tsKvEntryOpt.map(this::extractValue).orElse(null), ctx.getDbCallbackExecutor()); + private ListenableFuture fetchLatestValueAsync(TbContext ctx, EntityId entityId) { + return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, config.getInputValueKey()), + tsKvEntryOpt -> tsKvEntryOpt.map(this::extractValue).orElse(null), MoreExecutors.directExecutor()); } private ValueWithTs extractValue(TsKvEntry kvEntry) { @@ -139,42 +129,38 @@ public class CalculateDeltaNode implements TbNode { return new ValueWithTs(ts, result); } - private void processCalculateDelta(EntityId originator, long msgTs, ObjectNode json, ValueWithTs previousData) { - double currentValue = json.get(inputKey).asDouble(); - if (useCache) { - cache.put(originator, new ValueWithTs(msgTs, currentValue)); - } - BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); - if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { - throw new IllegalArgumentException("Delta value is negative!"); - } - if (config.getRound() != null) { - delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); - } - if (delta.stripTrailingZeros().scale() > 0) { - json.put(config.getOutputValueKey(), delta.doubleValue()); - } else { - json.put(config.getOutputValueKey(), delta.longValueExact()); - } - if (config.isAddPeriodBetweenMsgs()) { - long period = previousData != null ? msgTs - previousData.ts : 0; - json.put(config.getPeriodValueKey(), period); - } - } - protected ListenableFuture processMsgAsync(TbContext ctx, TbMsg msg) { - ListenableFuture latestValueFuture = getLatestFromCacheOrFetchFromDb(msg); + ListenableFuture latestValueFuture = getLatestFromCacheOrFetchFromDb(ctx, msg); return Futures.transform(latestValueFuture, previousData -> { ObjectNode json = (ObjectNode) JacksonUtil.toJsonNode(msg.getData()); - processCalculateDelta(msg.getOriginator(), msg.getMetaDataTs(), json, previousData); + double currentValue = json.get(config.getInputValueKey()).asDouble(); + if (config.isUseCache()) { + cache.put(msg.getOriginator(), new ValueWithTs(msg.getMetaDataTs(), currentValue)); + } + BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); + if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { + throw new IllegalArgumentException("Delta value is negative!"); + } + if (config.getRound() != null) { + delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); + } + if (delta.stripTrailingZeros().scale() > 0) { + json.put(config.getOutputValueKey(), delta.doubleValue()); + } else { + json.put(config.getOutputValueKey(), delta.longValueExact()); + } + if (config.isAddPeriodBetweenMsgs()) { + long period = previousData != null ? msg.getMetaDataTs() - previousData.ts : 0; + json.put(config.getPeriodValueKey(), period); + } return TbMsg.transformMsgData(msg, JacksonUtil.toString(json)); }, MoreExecutors.directExecutor()); } - private ListenableFuture getLatestFromCacheOrFetchFromDb(TbMsg msg) { + private ListenableFuture getLatestFromCacheOrFetchFromDb(TbContext ctx, TbMsg msg) { EntityId originator = msg.getOriginator(); ValueWithTs valueWithTs = cache.get(msg.getOriginator()); - return valueWithTs != null ? Futures.immediateFuture(valueWithTs) : fetchLatestValueAsync(originator); + return valueWithTs != null ? Futures.immediateFuture(valueWithTs) : fetchLatestValueAsync(ctx, originator); } private record ValueWithTs(long ts, double value) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java index 36b9719d86..93e3957c03 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java @@ -20,6 +20,9 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -54,9 +57,11 @@ import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; @@ -96,8 +101,6 @@ public class CalculateDeltaNodeTest { node = new CalculateDeltaNode(); config = new CalculateDeltaNodeConfiguration().defaultConfiguration(); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock); - node.init(ctxMock, nodeConfiguration); } @@ -111,6 +114,49 @@ public class CalculateDeltaNodeTest { assertTrue(config.isTellFailureIfDeltaIsNegative()); } + + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = {" "}) // blank value + public void givenInvalidInputKey_whenInitThenThrowException(String key) { + config.setInputValueKey(key); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration)); + assertThat(exception).hasMessage("Input value key should be specified!"); + assertThat(exception.isUnrecoverable()).isTrue(); + } + + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = {" "}) // blank value + public void givenInvalidOutputKey_whenInitThenThrowException(String key) { + config.setOutputValueKey(key); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration)); + assertThat(exception).hasMessage("Output value key should be specified!"); + assertThat(exception.isUnrecoverable()).isTrue(); + } + + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = {" "}) // blank value + public void givenInvalidPeriodKey_whenInitThenThrowException(String key) { + config.setPeriodValueKey(key); + config.setAddPeriodBetweenMsgs(true); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration)); + assertThat(exception).hasMessage("Period value key should be specified!"); + assertThat(exception.isUnrecoverable()).isTrue(); + } + + @Test + public void givenInvalidPeriodKeyAndAddPeriodDisabled_whenInitThenNoExceptionThrown() { + config.setPeriodValueKey(null); + config.setAddPeriodBetweenMsgs(false); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertDoesNotThrow(() -> node.init(ctxMock, nodeConfiguration)); + } + @Test public void givenInvalidMsgType_whenOnMsg_thenShouldTellNextOther() { // GIVEN @@ -528,6 +574,7 @@ public class CalculateDeltaNodeTest { private void mockFindLatestAsync(TsKvEntry tsKvEntry) { when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR); when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock); when(timeseriesServiceMock.findLatest( eq(TENANT_ID), eq(DUMMY_DEVICE_ORIGINATOR), eq(tsKvEntry.getKey()) )).thenReturn(Futures.immediateFuture(Optional.of(tsKvEntry)));