Improvements for data points storage

This commit is contained in:
Andrii Shvaika 2020-11-03 18:59:12 +02:00
parent 35dbe509f0
commit 123ea76c94
19 changed files with 181 additions and 74 deletions

View File

@ -18,7 +18,9 @@ package org.thingsboard.server.service.apiusage;
import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
@ -51,7 +53,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.telemetry.InternalTelemetryService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
@ -73,20 +74,25 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public static final String HOURLY = "Hourly";
public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {}
public void onSuccess(@Nullable Integer result) {
}
@Override
public void onFailure(Throwable t) {}
public void onFailure(Throwable t) {
}
};
private final TbClusterService clusterService;
private final PartitionService partitionService;
private final TenantService tenantService;
private final ApiUsageStateService apiUsageStateService;
private final TimeseriesService tsService;
private final InternalTelemetryService tsWsService;
private final ApiUsageStateService apiUsageStateService;
private final SchedulerComponent scheduler;
private final TbTenantProfileCache tenantProfileCache;
@Lazy
@Autowired
private InternalTelemetryService tsWsService;
// Tenants that should be processed on this server
private final Map<TenantId, TenantApiUsageState> myTenantStates = new ConcurrentHashMap<>();
// Tenants that should be processed on other servers
@ -102,16 +108,16 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public DefaultTbApiUsageStateService(TbClusterService clusterService,
PartitionService partitionService,
TenantService tenantService, ApiUsageStateService apiUsageStateService,
TimeseriesService tsService, TelemetrySubscriptionService tsWsService,
TenantService tenantService,
TimeseriesService tsService,
ApiUsageStateService apiUsageStateService,
SchedulerComponent scheduler,
TbTenantProfileCache tenantProfileCache) {
this.clusterService = clusterService;
this.partitionService = partitionService;
this.tenantService = tenantService;
this.apiUsageStateService = apiUsageStateService;
this.tsService = tsService;
this.tsWsService = tsWsService;
this.apiUsageStateService = apiUsageStateService;
this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache;
}
@ -156,7 +162,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
} finally {
updateLock.unlock();
}
tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, 0L, VOID_CALLBACK);
tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK);
if (!result.isEmpty()) {
persistAndNotify(tenantState, result);
}
@ -256,7 +262,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
}
}
if (!profileThresholds.isEmpty()) {
tsWsService.saveAndNotifyInternal(tenantId, id, profileThresholds, 0L, VOID_CALLBACK);
tsWsService.saveAndNotifyInternal(tenantId, id, profileThresholds, VOID_CALLBACK);
}
}
@ -266,7 +272,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
long ts = System.currentTimeMillis();
List<TsKvEntry> stateTelemetry = new ArrayList<>();
result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new BooleanDataEntry(apiFeature.getApiStateKey(), aState)))));
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, 0L, VOID_CALLBACK);
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK);
}
private void checkStartOfNextCycle() {

View File

@ -48,9 +48,9 @@ import java.util.stream.Collectors;
public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService {
public static final String TB_SERVICE_QUEUE = "TbServiceQueue";
public static final FutureCallback<Void> CALLBACK = new FutureCallback<Void>() {
public static final FutureCallback<Integer> CALLBACK = new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Void result) {
public void onSuccess(@Nullable Integer result) {
}
@ -85,7 +85,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
.map(kv -> new BasicTsKvEntry(ts, new LongDataEntry(kv.getKey(), (long) kv.getValue().get())))
.collect(Collectors.toList());
if (!tsList.isEmpty()) {
tsService.saveAndNotify(tenantId, serviceAssetId, tsList, CALLBACK);
tsService.saveAndNotifyInternal(tenantId, serviceAssetId, tsList, CALLBACK);
}
}
} catch (DataValidationException e) {
@ -97,7 +97,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> {
TsKvEntry tsKv = new BasicTsKvEntry(ts, new JsonDataEntry("ruleEngineException", e.toJsonString()));
try {
tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK);
tsService.saveAndNotifyInternal(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK);
} catch (DataValidationException e2) {
if (!e2.getMessage().equalsIgnoreCase("Asset is referencing to non-existent tenant!")) {
throw e2;

View File

@ -105,10 +105,10 @@ public abstract class AbstractSubscriptionService implements ApplicationListener
}
}
protected <T> void addWsCallback(ListenableFuture<List<T>> saveFuture, Consumer<T> callback) {
Futures.addCallback(saveFuture, new FutureCallback<List<T>>() {
protected <T> void addWsCallback(ListenableFuture<T> saveFuture, Consumer<T> callback) {
Futures.addCallback(saveFuture, new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable List<T> result) {
public void onSuccess(@Nullable T result) {
callback.accept(null);
}

View File

@ -40,9 +40,11 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@ -71,6 +73,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final TimeseriesService tsService;
private final EntityViewService entityViewService;
private final TbApiUsageClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService;
private ExecutorService tsCallBackExecutor;
@ -79,12 +82,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
EntityViewService entityViewService,
TbClusterService clusterService,
PartitionService partitionService,
TbApiUsageClient apiUsageClient) {
TbApiUsageClient apiUsageClient,
TbApiUsageStateService apiUsageStateService) {
super(clusterService, partitionService);
this.attrService = attrService;
this.tsService = tsService;
this.entityViewService = entityViewService;
this.apiUsageClient = apiUsageClient;
this.apiUsageStateService = apiUsageStateService;
}
@PostConstruct
@ -114,6 +119,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
checkInternalEntity(entityId);
if (apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
@ -128,11 +134,19 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
callback.onFailure(t);
}
});
} else{
callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
}
}
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback) {
saveAndNotifyInternal(tenantId, entityId, ts, 0L, callback);
}
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
ListenableFuture<List<Integer>> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
ListenableFuture<Integer> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
@ -197,7 +211,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addMainCallback(saveFuture, callback);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
}
@ -210,7 +224,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
addMainCallback(saveFuture, callback);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
}
@ -223,7 +237,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
addMainCallback(deleteFuture, callback);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys));
}
@ -236,7 +250,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
addMainCallback(deleteFuture, callback);
addVoidCallback(deleteFuture, callback);
}
@Override
@ -321,7 +335,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private <S, R> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<R> callback) {
private <S> void addVoidCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable S result) {
@ -335,6 +349,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}, tsCallBackExecutor);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable S result) {
callback.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
private void checkInternalEntity(EntityId entityId) {
if (EntityType.API_USAGE_STATE.equals(entityId.getEntityType())) {
throw new RuntimeException("Can't update API Usage State!");

View File

@ -29,6 +29,8 @@ import java.util.List;
*/
public interface InternalTelemetryService extends RuleEngineTelemetryService {
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback);
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback);
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);

View File

@ -36,9 +36,9 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
ListenableFuture<List<Integer>> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<List<Integer>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);

View File

@ -19,7 +19,7 @@ import java.util.Objects;
import java.util.Optional;
public class BasicTsKvEntry implements TsKvEntry {
private static final int MAX_CHARS_PER_DATA_POINT = 512;
private final long ts;
private final KvEntry kv;
@ -99,4 +99,21 @@ public class BasicTsKvEntry implements TsKvEntry {
public String getValueAsString() {
return kv.getValueAsString();
}
@Override
public int getDataPoints() {
int length;
switch (getDataType()) {
case STRING:
length = getStrValue().get().length();
break;
case JSON:
length = getJsonValue().get().length();
break;
default:
return 1;
}
return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT);
}
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.common.data.kv;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Represents time series KV data entry
*
@ -25,4 +27,7 @@ public interface TsKvEntry extends KvEntry {
long getTs();
@JsonIgnore
int getDataPoints();
}

View File

@ -100,7 +100,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(null);
}

View File

@ -30,11 +30,14 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseriesDao implements AggregationTimeseriesDao {
protected static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1);
@Autowired
protected ScheduledLogExecutorComponent logExecutor;
@ -56,6 +59,9 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
@Value("${sql.batch_sort:false}")
protected boolean batchSortEnabled;
@Value("${sql.ttl.ts.ts_key_value_ttl:0}")
private long systemTtl;
protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
List<ListenableFuture<List<TsKvEntry>>> futures = queries
.stream()
@ -75,4 +81,19 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
}
}, service);
}
protected long computeTtl(long ttl) {
if (systemTtl > 0) {
if (ttl == 0) {
ttl = systemTtl;
} else {
ttl = Math.min(systemTtl, ttl);
}
}
return ttl;
}
protected int getDataPointDays(TsKvEntry tsKvEntry, long ttl) {
return tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY));
}
}

View File

@ -15,7 +15,9 @@
*/
package org.thingsboard.server.dao.sqlts.hsql;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
@ -35,7 +37,8 @@ import org.thingsboard.server.dao.util.SqlTsDao;
public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao {
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
int dataPointDays = getDataPointDays(tsKvEntry, computeTtl(ttl));
String strKey = tsKvEntry.getKey();
Integer keyId = getOrSaveKeyId(strKey);
TsKvEntity entity = new TsKvEntity();
@ -48,7 +51,7 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
log.trace("Saving entity: {}", entity);
return tsQueue.add(entity);
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
}
}

View File

@ -15,7 +15,9 @@
*/
package org.thingsboard.server.dao.sqlts.psql;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -71,7 +73,8 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
}
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
int dataPointDays = getDataPointDays(tsKvEntry, computeTtl(ttl));
savePartitionIfNotExist(tsKvEntry.getTs());
String strKey = tsKvEntry.getKey();
Integer keyId = getOrSaveKeyId(strKey);
@ -85,7 +88,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
log.trace("Saving entity: {}", entity);
return tsQueue.add(entity);
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
}
private void savePartitionIfNotExist(long ts) {

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@ -101,7 +102,8 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
}
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
int dataPointDays = getDataPointDays(tsKvEntry, computeTtl(ttl));
String strKey = tsKvEntry.getKey();
Integer keyId = getOrSaveKeyId(strKey);
TimescaleTsKvEntity entity = new TimescaleTsKvEntity();
@ -113,14 +115,13 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
log.trace("Saving entity to timescale db: {}", entity);
return tsQueue.add(entity);
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(null);
public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(0);
}
@Override

View File

@ -15,11 +15,13 @@
*/
package org.thingsboard.server.dao.timeseries;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@ -55,6 +57,20 @@ public class BaseTimeseriesService implements TimeseriesService {
private static final int INSERTS_PER_ENTRY = 3;
private static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
public static final Function<List<Integer>, Integer> SUM_ALL_INTEGERS = new Function<List<Integer>, Integer>() {
@Override
public @Nullable Integer apply(@Nullable List<Integer> input) {
int result = 0;
if (input != null) {
for (Integer tmp : input) {
if (tmp != null) {
result += tmp;
}
}
}
return result;
}
};
@Value("${database.ts_max_intervals}")
private long maxTsIntervals;
@ -101,26 +117,26 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
validate(entityId);
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
}
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L);
return Futures.allAsList(futures);
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
}
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
}
return Futures.allAsList(futures);
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
}
@Override
@ -135,12 +151,12 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.allAsList(futures);
}
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
}
futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl));
futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
}
@ -216,7 +232,7 @@ public class BaseTimeseriesService implements TimeseriesService {
} else if (query.getAggregation() == null) {
throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
}
if(!Aggregation.NONE.equals(query.getAggregation())) {
if (!Aggregation.NONE.equals(query.getAggregation())) {
long step = Math.max(query.getInterval(), 1000);
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {

View File

@ -60,6 +60,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
@ -74,6 +75,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
protected static final int MIN_AGGREGATION_STEP_MS = 1000;
public static final String ASC_ORDER = "ASC";
public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1);
protected static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
@Autowired
@ -141,9 +144,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
List<ListenableFuture<Void>> futures = new ArrayList<>();
ttl = computeTtl(ttl);
int dataPointDays = tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY));
long partition = toPartitionTs(tsKvEntry.getTs());
DataType type = tsKvEntry.getDataType();
if (setNullValuesEnabled) {
@ -161,11 +165,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
BoundStatement stmt = stmtBuilder.build();
futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null));
return Futures.transform(Futures.allAsList(futures), result -> null, MoreExecutors.directExecutor());
return Futures.transform(Futures.allAsList(futures), result -> dataPointDays, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
if (isFixedPartitioning()) {
return Futures.immediateFuture(null);
}
@ -181,7 +185,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
stmtBuilder.setInt(4, (int) ttl);
}
BoundStatement stmt = stmtBuilder.build();
return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
return getFuture(executeAsyncWrite(tenantId, stmt), rs -> 0);
}
@Override
@ -649,9 +653,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
/**
// * Select existing partitions from the table
// * <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
// */
* // * Select existing partitions from the table
* // * <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
* //
*/
private TbResultSetFuture fetchPartitions(TenantId tenantId, EntityId entityId, String key, long minPartition, long maxPartition) {
Select select = QueryBuilder.selectFrom(ModelConstants.TS_KV_PARTITIONS_CF).column(ModelConstants.PARTITION_COLUMN)
.whereColumn(ModelConstants.ENTITY_TYPE_COLUMN).isEqualTo(literal(entityId.getEntityType().name()))

View File

@ -31,9 +31,9 @@ public interface TimeseriesDao {
ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries);
ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl);
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl);
ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl);
ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl);
ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);

View File

@ -793,7 +793,7 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
}
}
List<ListenableFuture<List<Void>>> timeseriesFutures = new ArrayList<>();
List<ListenableFuture<Integer>> timeseriesFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
timeseriesFutures.add(saveLongTimeseries(device.getId(), "temperature", temperatures.get(i)));
@ -1272,7 +1272,7 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr));
}
private ListenableFuture<List<Void>> saveLongTimeseries(EntityId entityId, String key, Double value) {
private ListenableFuture<Integer> saveLongTimeseries(EntityId entityId, String key, Double value) {
TsKvEntity tsKv = new TsKvEntity();
tsKv.setStrKey(key);
tsKv.setDoubleValue(value);