fixed adter review

This commit is contained in:
ShvaykaD 2024-04-16 11:56:56 +03:00
parent 827c898179
commit 27d026821a
8 changed files with 98 additions and 96 deletions

View File

@ -42,8 +42,6 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys); ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
List<TsKvEntry> findLatestSync(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);

View File

@ -19,6 +19,8 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -50,8 +52,6 @@ import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -163,11 +163,6 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return service.submit(() -> getLatestTsKvEntry(entityId, key)); return service.submit(() -> getLatestTsKvEntry(entityId, key));
} }
@Override
public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) {
return getLatestTsKvEntry(entityId, key);
}
@Override @Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return getFindAllLatestFuture(entityId); return getFindAllLatestFuture(entityId);
@ -209,7 +204,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
ReadTsKvQueryResult::getData, MoreExecutors.directExecutor()); ReadTsKvQueryResult::getData, MoreExecutors.directExecutor());
} }
protected TsKvEntry doFindLatest(EntityId entityId, String key) { protected TsKvEntry doFindLatest(EntityId entityId, String key) {
TsKvLatestCompositeKey compositeKey = TsKvLatestCompositeKey compositeKey =
new TsKvLatestCompositeKey( new TsKvLatestCompositeKey(
entityId.getId(), entityId.getId(),

View File

@ -134,17 +134,6 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
@Override
public List<TsKvEntry> findLatestSync(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId);
List<TsKvEntry> latestEntries = new ArrayList<>(keys.size());
keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k));
for (String key : keys) {
latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key));
}
return latestEntries;
}
@Override @Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId); validate(entityId);

View File

@ -44,7 +44,6 @@ import org.thingsboard.server.dao.util.NoSqlTsLatestDao;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
@ -70,16 +69,6 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one())); return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one()));
} }
@Override
public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) {
try {
return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one())).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}][{}] Failed to get latest entry for key: {} due to: ", tenantId, entityId, key, e);
throw new RuntimeException(e);
}
}
private <T> ListenableFuture<T> findLatest(TenantId tenantId, EntityId entityId, String key, java.util.function.Function<TbResultSet, T> function) { private <T> ListenableFuture<T> findLatest(TenantId tenantId, EntityId entityId, String key, java.util.function.Function<TbResultSet, T> function) {
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getFindLatestStmt().bind()); BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getFindLatestStmt().bind());
stmtBuilder.setString(0, entityId.getEntityType().name()); stmtBuilder.setString(0, entityId.getEntityType().name());

View File

@ -40,8 +40,6 @@ public interface TimeseriesLatestDao {
*/ */
ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key); ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key);
TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key);
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);

View File

@ -105,8 +105,8 @@ public class TbMathNode implements TbNode {
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new); locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new)
semaphoreWithQueue.addToQueueAndTryProcess(msg, ctx, this::processMsgAsync); .addToQueueAndTryProcess(msg, ctx, this::processMsgAsync);
} }
ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) { ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) {

View File

@ -30,20 +30,18 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.util.SemaphoreWithTbMsgQueue; import org.thingsboard.rule.engine.util.SemaphoreWithTbMsgQueue;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.Map; import java.util.Map;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j @Slf4j
@RuleNode(type = ComponentType.ENRICHMENT, @RuleNode(type = ComponentType.ENRICHMENT,
name = "calculate delta", name = "calculate delta",
@ -61,20 +59,21 @@ public class CalculateDeltaNode implements TbNode {
private Map<EntityId, SemaphoreWithTbMsgQueue> locks; private Map<EntityId, SemaphoreWithTbMsgQueue> locks;
private CalculateDeltaNodeConfiguration config; private CalculateDeltaNodeConfiguration config;
private TbContext ctx;
private TimeseriesService timeseriesService;
private boolean useCache;
private String inputKey;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, CalculateDeltaNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, CalculateDeltaNodeConfiguration.class);
this.ctx = ctx; if (StringUtils.isBlank(config.getInputValueKey())) {
this.timeseriesService = ctx.getTimeseriesService(); throw new TbNodeException("Input value key should be specified!", true);
this.inputKey = config.getInputValueKey(); }
this.useCache = config.isUseCache(); if (StringUtils.isBlank(config.getOutputValueKey())) {
if (useCache) { throw new TbNodeException("Output value key should be specified!", true);
locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); }
if (config.isAddPeriodBetweenMsgs() && StringUtils.isBlank(config.getPeriodValueKey())) {
throw new TbNodeException("Period value key should be specified!", true);
}
locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
if (config.isUseCache()) {
cache = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT); cache = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT);
} }
} }
@ -85,35 +84,26 @@ public class CalculateDeltaNode implements TbNode {
ctx.tellNext(msg, TbNodeConnectionType.OTHER); ctx.tellNext(msg, TbNodeConnectionType.OTHER);
return; return;
} }
JsonNode json = JacksonUtil.toJsonNode(msg.getData()); JsonNode msgData = JacksonUtil.toJsonNode(msg.getData());
if (!json.has(inputKey)) { if (msgData == null || !msgData.has(config.getInputValueKey())) {
ctx.tellNext(msg, TbNodeConnectionType.OTHER); ctx.tellNext(msg, TbNodeConnectionType.OTHER);
return; return;
} }
if (useCache) { locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new)
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithTbMsgQueue::new); .addToQueueAndTryProcess(msg, ctx, this::processMsgAsync);
semaphoreWithQueue.addToQueueAndTryProcess(msg, ctx, this::processMsgAsync);
return;
}
withCallback(fetchLatestValueAsync(msg.getOriginator()),
previousData -> {
processCalculateDelta(msg.getOriginator(), msg.getMetaDataTs(), (ObjectNode) json, previousData);
ctx.tellSuccess(TbMsg.transformMsgData(msg, JacksonUtil.toString(json)));
},
t -> ctx.tellFailure(msg, t), MoreExecutors.directExecutor());
} }
@Override @Override
public void destroy() { public void destroy() {
if (useCache) { locks.clear();
if (config.isUseCache()) {
cache.clear(); cache.clear();
locks.clear();
} }
} }
private ListenableFuture<ValueWithTs> fetchLatestValueAsync(EntityId entityId) { private ListenableFuture<ValueWithTs> fetchLatestValueAsync(TbContext ctx, EntityId entityId) {
return Futures.transform(timeseriesService.findLatest(ctx.getTenantId(), entityId, config.getInputValueKey()), return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, config.getInputValueKey()),
tsKvEntryOpt -> tsKvEntryOpt.map(this::extractValue).orElse(null), ctx.getDbCallbackExecutor()); tsKvEntryOpt -> tsKvEntryOpt.map(this::extractValue).orElse(null), MoreExecutors.directExecutor());
} }
private ValueWithTs extractValue(TsKvEntry kvEntry) { private ValueWithTs extractValue(TsKvEntry kvEntry) {
@ -139,42 +129,38 @@ public class CalculateDeltaNode implements TbNode {
return new ValueWithTs(ts, result); return new ValueWithTs(ts, result);
} }
private void processCalculateDelta(EntityId originator, long msgTs, ObjectNode json, ValueWithTs previousData) {
double currentValue = json.get(inputKey).asDouble();
if (useCache) {
cache.put(originator, new ValueWithTs(msgTs, currentValue));
}
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
throw new IllegalArgumentException("Delta value is negative!");
}
if (config.getRound() != null) {
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
}
if (delta.stripTrailingZeros().scale() > 0) {
json.put(config.getOutputValueKey(), delta.doubleValue());
} else {
json.put(config.getOutputValueKey(), delta.longValueExact());
}
if (config.isAddPeriodBetweenMsgs()) {
long period = previousData != null ? msgTs - previousData.ts : 0;
json.put(config.getPeriodValueKey(), period);
}
}
protected ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) { protected ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) {
ListenableFuture<ValueWithTs> latestValueFuture = getLatestFromCacheOrFetchFromDb(msg); ListenableFuture<ValueWithTs> latestValueFuture = getLatestFromCacheOrFetchFromDb(ctx, msg);
return Futures.transform(latestValueFuture, previousData -> { return Futures.transform(latestValueFuture, previousData -> {
ObjectNode json = (ObjectNode) JacksonUtil.toJsonNode(msg.getData()); ObjectNode json = (ObjectNode) JacksonUtil.toJsonNode(msg.getData());
processCalculateDelta(msg.getOriginator(), msg.getMetaDataTs(), json, previousData); double currentValue = json.get(config.getInputValueKey()).asDouble();
if (config.isUseCache()) {
cache.put(msg.getOriginator(), new ValueWithTs(msg.getMetaDataTs(), currentValue));
}
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
throw new IllegalArgumentException("Delta value is negative!");
}
if (config.getRound() != null) {
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
}
if (delta.stripTrailingZeros().scale() > 0) {
json.put(config.getOutputValueKey(), delta.doubleValue());
} else {
json.put(config.getOutputValueKey(), delta.longValueExact());
}
if (config.isAddPeriodBetweenMsgs()) {
long period = previousData != null ? msg.getMetaDataTs() - previousData.ts : 0;
json.put(config.getPeriodValueKey(), period);
}
return TbMsg.transformMsgData(msg, JacksonUtil.toString(json)); return TbMsg.transformMsgData(msg, JacksonUtil.toString(json));
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
private ListenableFuture<ValueWithTs> getLatestFromCacheOrFetchFromDb(TbMsg msg) { private ListenableFuture<ValueWithTs> getLatestFromCacheOrFetchFromDb(TbContext ctx, TbMsg msg) {
EntityId originator = msg.getOriginator(); EntityId originator = msg.getOriginator();
ValueWithTs valueWithTs = cache.get(msg.getOriginator()); ValueWithTs valueWithTs = cache.get(msg.getOriginator());
return valueWithTs != null ? Futures.immediateFuture(valueWithTs) : fetchLatestValueAsync(originator); return valueWithTs != null ? Futures.immediateFuture(valueWithTs) : fetchLatestValueAsync(ctx, originator);
} }
private record ValueWithTs(long ts, double value) { private record ValueWithTs(long ts, double value) {

View File

@ -20,6 +20,9 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -54,9 +57,11 @@ import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyList;
@ -96,8 +101,6 @@ public class CalculateDeltaNodeTest {
node = new CalculateDeltaNode(); node = new CalculateDeltaNode();
config = new CalculateDeltaNodeConfiguration().defaultConfiguration(); config = new CalculateDeltaNodeConfiguration().defaultConfiguration();
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock);
node.init(ctxMock, nodeConfiguration); node.init(ctxMock, nodeConfiguration);
} }
@ -111,6 +114,49 @@ public class CalculateDeltaNodeTest {
assertTrue(config.isTellFailureIfDeltaIsNegative()); assertTrue(config.isTellFailureIfDeltaIsNegative());
} }
@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {" "}) // blank value
public void givenInvalidInputKey_whenInitThenThrowException(String key) {
config.setInputValueKey(key);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration));
assertThat(exception).hasMessage("Input value key should be specified!");
assertThat(exception.isUnrecoverable()).isTrue();
}
@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {" "}) // blank value
public void givenInvalidOutputKey_whenInitThenThrowException(String key) {
config.setOutputValueKey(key);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration));
assertThat(exception).hasMessage("Output value key should be specified!");
assertThat(exception.isUnrecoverable()).isTrue();
}
@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {" "}) // blank value
public void givenInvalidPeriodKey_whenInitThenThrowException(String key) {
config.setPeriodValueKey(key);
config.setAddPeriodBetweenMsgs(true);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
var exception = assertThrows(TbNodeException.class, () -> node.init(ctxMock, nodeConfiguration));
assertThat(exception).hasMessage("Period value key should be specified!");
assertThat(exception.isUnrecoverable()).isTrue();
}
@Test
public void givenInvalidPeriodKeyAndAddPeriodDisabled_whenInitThenNoExceptionThrown() {
config.setPeriodValueKey(null);
config.setAddPeriodBetweenMsgs(false);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertDoesNotThrow(() -> node.init(ctxMock, nodeConfiguration));
}
@Test @Test
public void givenInvalidMsgType_whenOnMsg_thenShouldTellNextOther() { public void givenInvalidMsgType_whenOnMsg_thenShouldTellNextOther() {
// GIVEN // GIVEN
@ -528,6 +574,7 @@ public class CalculateDeltaNodeTest {
private void mockFindLatestAsync(TsKvEntry tsKvEntry) { private void mockFindLatestAsync(TsKvEntry tsKvEntry) {
when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR); when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR);
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock);
when(timeseriesServiceMock.findLatest( when(timeseriesServiceMock.findLatest(
eq(TENANT_ID), eq(DUMMY_DEVICE_ORIGINATOR), eq(tsKvEntry.getKey()) eq(TENANT_ID), eq(DUMMY_DEVICE_ORIGINATOR), eq(tsKvEntry.getKey())
)).thenReturn(Futures.immediateFuture(Optional.of(tsKvEntry))); )).thenReturn(Futures.immediateFuture(Optional.of(tsKvEntry)));