diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java index a6d3ea6681..348f21bfdd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java @@ -52,22 +52,33 @@ public final class TsKvEntity implements ToData { public TsKvEntity() { } - public TsKvEntity(Double avgLongValue, Double avgDoubleValue) { - if(avgLongValue != null) { - this.longValue = avgLongValue.longValue(); + public TsKvEntity(Long longSumValue, Double doubleSumValue, Long longCountValue, Long doubleCountValue) { + double sum = 0.0; + if (longSumValue != null) { + sum += longSumValue; } - this.doubleValue = avgDoubleValue; + if (doubleSumValue != null) { + sum += doubleSumValue; + } + this.doubleValue = sum / (longCountValue + doubleCountValue); } public TsKvEntity(Long sumLongValue, Double sumDoubleValue) { - this.longValue = sumLongValue; - this.doubleValue = sumDoubleValue; + if (sumDoubleValue != null) { + this.doubleValue = sumDoubleValue + (sumLongValue != null ? sumLongValue.doubleValue() : 0.0); + } else { + this.longValue = sumLongValue; + } } - public TsKvEntity(String strValue, Long longValue, Double doubleValue) { + public TsKvEntity(String strValue, Long longValue, Double doubleValue, boolean max) { this.strValue = strValue; - this.longValue = longValue; - this.doubleValue = doubleValue; + if (longValue != null && doubleValue != null) { + this.doubleValue = max ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue()); + } else { + this.longValue = longValue; + this.doubleValue = doubleValue; + } } public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) { @@ -75,10 +86,8 @@ public final class TsKvEntity implements ToData { this.longValue = booleanValueCount; } else if (strValueCount != 0) { this.longValue = strValueCount; - } else if (longValueCount != 0) { - this.longValue = longValueCount; - } else if (doubleValueCount != 0) { - this.longValue = doubleValueCount; + } else { + this.longValue = longValueCount + doubleValueCount; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java index 296d173cf7..92bdf7e488 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java @@ -55,7 +55,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") CompletableFuture findMax(@Param("entityId") String entityId, @@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") CompletableFuture findMin(@Param("entityId") String entityId, @@ -85,7 +85,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") CompletableFuture findAvg(@Param("entityId") String entityId, diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java index b5ebb10922..ea1e8f1c1b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -98,6 +98,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct curLValue = getLongValue(row); } if (doubleCount > 0) { + aggResult.hasDouble = true; aggResult.dataType = DataType.DOUBLE; curCount += doubleCount; curDValue = getDoubleValue(row); @@ -222,17 +223,25 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) { return Optional.empty(); } else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) { - double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L); - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count)))); + if(aggregation == Aggregation.AVG || aggResult.hasDouble) { + double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L); + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count)))); + } else { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? aggResult.lValue : (aggResult.lValue / aggResult.count)))); + } } return Optional.empty(); } private Optional processMinOrMaxResult(AggregationResult aggResult) { if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) { - double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE); - double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE); - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL)))); + if(aggResult.hasDouble) { + double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE); + double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE); + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL)))); + } else { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue))); + } } else if (aggResult.dataType == DataType.STRING) { return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue))); } else { @@ -247,5 +256,6 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct Double dValue = null; Long lValue = null; long count = 0; + boolean hasDouble = false; } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java index 55c2f70f1d..ddea70a38c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java @@ -25,9 +25,7 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClassnameFilters({ - "org.thingsboard.server.dao.service.*ServiceNoSqlTest", - "org.thingsboard.server.dao.service.queue.cassandra.*.*.*Test", - "org.thingsboard.server.dao.service.queue.cassandra.*Test" + "org.thingsboard.server.dao.service.*ServiceNoSqlTest" }) public class NoSqlDaoServiceTestSuite { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index b409dea570..7378bcd380 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -221,13 +221,13 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { 60000, 20000, 3, Aggregation.AVG))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); - assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); + assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue()); assertEquals(30000, list.get(1).getTs()); - assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue()); + assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue()); assertEquals(50000, list.get(2).getTs()); - assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); + assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue()); list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, 60000, 20000, 3, Aggregation.SUM))).get(); @@ -282,12 +282,110 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue()); } + @Test + public void testFindDeviceLongAndDoubleTsData() throws Exception { + DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + List entries = new ArrayList<>(); + + entries.add(save(deviceId, 5000, 100)); + entries.add(save(deviceId, 15000, 200.0)); + + entries.add(save(deviceId, 25000, 300)); + entries.add(save(deviceId, 35000, 400.0)); + + entries.add(save(deviceId, 45000, 500)); + entries.add(save(deviceId, 55000, 600.0)); + + List list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.NONE))).get(); + assertEquals(3, list.size()); + assertEquals(55000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(600.0), list.get(0).getDoubleValue()); + + assertEquals(45000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue()); + + assertEquals(35000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(400.0), list.get(2).getDoubleValue()); + + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.AVG))).get(); + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue()); + + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.SUM))).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(300.0), list.get(0).getDoubleValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(700.0), list.get(1).getDoubleValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(1100.0), list.get(2).getDoubleValue()); + + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.MIN))).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(100.0), list.get(0).getDoubleValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(300.0), list.get(1).getDoubleValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(500.0), list.get(2).getDoubleValue()); + + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.MAX))).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(200.0), list.get(0).getDoubleValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(400.0), list.get(1).getDoubleValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(600.0), list.get(2).getDoubleValue()); + + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.COUNT))).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue()); + } + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value)); tsService.save(tenantId, deviceId, entry).get(); return entry; } + private TsKvEntry save(DeviceId deviceId, long ts, double value) throws Exception { + TsKvEntry entry = new BasicTsKvEntry(ts, new DoubleDataEntry(LONG_KEY, value)); + tsService.save(tenantId, deviceId, entry).get(); + return entry; + } + + private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException { tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get(); tsService.save(tenantId, deviceId, toTsEntry(ts, longKvEntry)).get();