save ts without latest
This commit is contained in:
		
							parent
							
								
									77b9a8c1af
								
							
						
					
					
						commit
						c43e8950a2
					
				@ -140,6 +140,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
 | 
			
		||||
        checkInternalEntity(entityId);
 | 
			
		||||
        boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
 | 
			
		||||
        if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
 | 
			
		||||
            saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(Integer result) {
 | 
			
		||||
                    if (!sysTenant && result != null && result > 0) {
 | 
			
		||||
                        apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
 | 
			
		||||
                    }
 | 
			
		||||
                    callback.onSuccess(null);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    callback.onFailure(t);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        } else {
 | 
			
		||||
            callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
 | 
			
		||||
        ListenableFuture<Integer> saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl);
 | 
			
		||||
        addMainCallback(saveFuture, callback);
 | 
			
		||||
        addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
 | 
			
		||||
        // TODO: 12/11/2021 do we need entityView searching here?
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback) {
 | 
			
		||||
        saveAndNotifyInternal(tenantId, entityId, ts, 0L, callback);
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,8 @@ public interface TimeseriesService {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
 | 
			
		||||
 | 
			
		||||
@ -56,6 +56,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 | 
			
		||||
public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
 | 
			
		||||
    private static final int INSERTS_PER_ENTRY = 3;
 | 
			
		||||
    private static final int INSERTS_PER_ENTRY_WITHOUT_LATEST = 2;
 | 
			
		||||
    private static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
 | 
			
		||||
    public static final Function<List<Integer>, Integer> SUM_ALL_INTEGERS = new Function<List<Integer>, Integer>() {
 | 
			
		||||
        @Override
 | 
			
		||||
@ -154,6 +155,18 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
        return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
 | 
			
		||||
        List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST);
 | 
			
		||||
        for (TsKvEntry tsKvEntry : tsKvEntries) {
 | 
			
		||||
            if (tsKvEntry == null) {
 | 
			
		||||
                throw new IncorrectParameterException("Key value entry can't be null");
 | 
			
		||||
            }
 | 
			
		||||
            saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
 | 
			
		||||
        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size());
 | 
			
		||||
@ -175,6 +188,14 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
        futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
 | 
			
		||||
        if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
 | 
			
		||||
            throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
 | 
			
		||||
        }
 | 
			
		||||
        futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
 | 
			
		||||
        futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ReadTsKvQuery> updateQueriesForEntityView(EntityView entityView, List<ReadTsKvQuery> queries) {
 | 
			
		||||
        return queries.stream().map(query -> {
 | 
			
		||||
            long startTs;
 | 
			
		||||
 | 
			
		||||
@ -34,6 +34,8 @@ public interface RuleEngineTelemetryService {
 | 
			
		||||
 | 
			
		||||
    void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
        nodeDescription = "Saves timeseries data",
 | 
			
		||||
        nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type",
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbActionNodeTimeseriesConfig",
 | 
			
		||||
        configDirective = "tbActionNodeCustomTimeseriesConfig",
 | 
			
		||||
        icon = "file_upload"
 | 
			
		||||
)
 | 
			
		||||
public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
@ -94,7 +94,11 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
        if (ttl == 0L) {
 | 
			
		||||
            ttl = tenantProfileDefaultStorageTtl;
 | 
			
		||||
        }
 | 
			
		||||
        ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
 | 
			
		||||
        if (config.isSkipLatestPersistence()) {
 | 
			
		||||
            ctx.getTelemetryService().saveWithoutLatestAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static long getTs(TbMsg msg) {
 | 
			
		||||
 | 
			
		||||
@ -22,11 +22,13 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private long defaultTTL;
 | 
			
		||||
    private boolean skipLatestPersistence;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
 | 
			
		||||
        configuration.setDefaultTTL(0L);
 | 
			
		||||
        configuration.setSkipLatestPersistence(false);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user