aggregation for numeric data types should process both types - double and long

This commit is contained in:
vparomskiy 2018-12-12 19:03:17 +02:00 committed by Andrew Shvayka
parent b1849e98f7
commit 66c0e7179e

View File

@ -79,7 +79,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
} }
private void processResultSetRow(Row row, AggregationResult aggResult) { private void processResultSetRow(Row row, AggregationResult aggResult) {
long curCount; long curCount = 0L;
Long curLValue = null; Long curLValue = null;
Double curDValue = null; Double curDValue = null;
@ -91,14 +91,17 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
long boolCount = row.getLong(BOOL_CNT_POS); long boolCount = row.getLong(BOOL_CNT_POS);
long strCount = row.getLong(STR_CNT_POS); long strCount = row.getLong(STR_CNT_POS);
if (longCount > 0 || doubleCount > 0) {
if (longCount > 0) { if (longCount > 0) {
aggResult.dataType = DataType.LONG; aggResult.dataType = DataType.LONG;
curCount = longCount; curCount += longCount;
curLValue = getLongValue(row); curLValue = getLongValue(row);
} else if (doubleCount > 0) { }
if (doubleCount > 0) {
aggResult.dataType = DataType.DOUBLE; aggResult.dataType = DataType.DOUBLE;
curCount = doubleCount; curCount += doubleCount;
curDValue = getDoubleValue(row); curDValue = getDoubleValue(row);
}
} else if (boolCount > 0) { } else if (boolCount > 0) {
aggResult.dataType = DataType.BOOLEAN; aggResult.dataType = DataType.BOOLEAN;
curCount = boolCount; curCount = boolCount;
@ -126,16 +129,20 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
aggResult.count += curCount; aggResult.count += curCount;
if (curDValue != null) { if (curDValue != null) {
aggResult.dValue = aggResult.dValue == null ? curDValue : aggResult.dValue + curDValue; aggResult.dValue = aggResult.dValue == null ? curDValue : aggResult.dValue + curDValue;
} else if (curLValue != null) { }
if (curLValue != null) {
aggResult.lValue = aggResult.lValue == null ? curLValue : aggResult.lValue + curLValue; aggResult.lValue = aggResult.lValue == null ? curLValue : aggResult.lValue + curLValue;
} }
} }
private void processMinAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) { private void processMinAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) {
if (curDValue != null || curLValue != null) {
if (curDValue != null) { if (curDValue != null) {
aggResult.dValue = aggResult.dValue == null ? curDValue : Math.min(aggResult.dValue, curDValue); aggResult.dValue = aggResult.dValue == null ? curDValue : Math.min(aggResult.dValue, curDValue);
} else if (curLValue != null) { }
if (curLValue != null) {
aggResult.lValue = aggResult.lValue == null ? curLValue : Math.min(aggResult.lValue, curLValue); aggResult.lValue = aggResult.lValue == null ? curLValue : Math.min(aggResult.lValue, curLValue);
}
} else if (curBValue != null) { } else if (curBValue != null) {
aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue && curBValue; aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue && curBValue;
} else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) < 0)) { } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) < 0)) {
@ -144,10 +151,13 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
} }
private void processMaxAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) { private void processMaxAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) {
if (curDValue != null || curLValue != null) {
if (curDValue != null) { if (curDValue != null) {
aggResult.dValue = aggResult.dValue == null ? curDValue : Math.max(aggResult.dValue, curDValue); aggResult.dValue = aggResult.dValue == null ? curDValue : Math.max(aggResult.dValue, curDValue);
} else if (curLValue != null) { }
if (curLValue != null) {
aggResult.lValue = aggResult.lValue == null ? curLValue : Math.max(aggResult.lValue, curLValue); aggResult.lValue = aggResult.lValue == null ? curLValue : Math.max(aggResult.lValue, curLValue);
}
} else if (curBValue != null) { } else if (curBValue != null) {
aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue || curBValue; aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue || curBValue;
} else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) > 0)) { } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) > 0)) {
@ -211,19 +221,18 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
private Optional<TsKvEntry> processAvgOrSumResult(AggregationResult aggResult) { private Optional<TsKvEntry> processAvgOrSumResult(AggregationResult aggResult) {
if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) { if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) {
return Optional.empty(); return Optional.empty();
} else if (aggResult.dataType == DataType.DOUBLE) { } else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? aggResult.dValue : (aggResult.dValue / aggResult.count)))); double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
} else if (aggResult.dataType == DataType.LONG) { return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? aggResult.lValue : (aggResult.lValue / aggResult.count))));
} }
return Optional.empty(); return Optional.empty();
} }
private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) { private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) {
if (aggResult.dataType == DataType.DOUBLE) { if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggResult.dValue))); double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
} else if (aggResult.dataType == DataType.LONG) { double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue))); return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
} else if (aggResult.dataType == DataType.STRING) { } else if (aggResult.dataType == DataType.STRING) {
return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue))); return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue)));
} else { } else {