Data Points storage

This commit is contained in:
Andrii Shvaika 2020-11-03 13:43:27 +02:00
parent a64c4e3e20
commit 35dbe509f0
5 changed files with 32 additions and 24 deletions

View File

@ -71,9 +71,9 @@ import java.util.concurrent.locks.ReentrantLock;
public class DefaultTbApiUsageStateService implements TbApiUsageStateService { public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public static final String HOURLY = "Hourly"; public static final String HOURLY = "Hourly";
public static final FutureCallback<Void> VOID_CALLBACK = new FutureCallback<Void>() { public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
@Override @Override
public void onSuccess(@Nullable Void result) {} public void onSuccess(@Nullable Integer result) {}
@Override @Override
public void onFailure(Throwable t) {} public void onFailure(Throwable t) {}

View File

@ -105,10 +105,10 @@ public abstract class AbstractSubscriptionService implements ApplicationListener
} }
} }
protected void addWsCallback(ListenableFuture<List<Void>> saveFuture, Consumer<Void> callback) { protected <T> void addWsCallback(ListenableFuture<List<T>> saveFuture, Consumer<T> callback) {
Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() { Futures.addCallback(saveFuture, new FutureCallback<List<T>>() {
@Override @Override
public void onSuccess(@Nullable List<Void> result) { public void onSuccess(@Nullable List<T> result) {
callback.accept(null); callback.accept(null);
} }

View File

@ -20,10 +20,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@ -42,10 +41,9 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -59,12 +57,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/** /**
* Created by ashvayka on 27.03.18. * Created by ashvayka on 27.03.18.
@ -76,6 +70,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final AttributesService attrService; private final AttributesService attrService;
private final TimeseriesService tsService; private final TimeseriesService tsService;
private final EntityViewService entityViewService; private final EntityViewService entityViewService;
private final TbApiUsageClient apiUsageClient;
private ExecutorService tsCallBackExecutor; private ExecutorService tsCallBackExecutor;
@ -83,11 +78,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
TimeseriesService tsService, TimeseriesService tsService,
EntityViewService entityViewService, EntityViewService entityViewService,
TbClusterService clusterService, TbClusterService clusterService,
PartitionService partitionService) { PartitionService partitionService,
TbApiUsageClient apiUsageClient) {
super(clusterService, partitionService); super(clusterService, partitionService);
this.attrService = attrService; this.attrService = attrService;
this.tsService = tsService; this.tsService = tsService;
this.entityViewService = entityViewService; this.entityViewService = entityViewService;
this.apiUsageClient = apiUsageClient;
} }
@PostConstruct @PostConstruct
@ -117,12 +114,25 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override @Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) { public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
checkInternalEntity(entityId); checkInternalEntity(entityId);
saveAndNotifyInternal(tenantId, entityId, ts, ttl, callback); saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
if (result != null && result > 0) {
apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
}
callback.onSuccess(null);
} }
@Override @Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) { public void onFailure(Throwable t) {
ListenableFuture<List<Void>> saveFuture = tsService.save(tenantId, entityId, ts, ttl); callback.onFailure(t);
}
});
}
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
ListenableFuture<List<Integer>> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
addMainCallback(saveFuture, callback); addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
@ -147,9 +157,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
Optional<TsKvEntry> tsKvEntry = entries.stream() Optional<TsKvEntry> tsKvEntry = entries.stream()
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
.max(Comparator.comparingLong(TsKvEntry::getTs)); .max(Comparator.comparingLong(TsKvEntry::getTs));
if (tsKvEntry.isPresent()) { tsKvEntry.ifPresent(entityViewLatest::add);
entityViewLatest.add(tsKvEntry.get());
}
} }
} }
if (!entityViewLatest.isEmpty()) { if (!entityViewLatest.isEmpty()) {

View File

@ -29,7 +29,7 @@ import java.util.List;
*/ */
public interface InternalTelemetryService extends RuleEngineTelemetryService { public interface InternalTelemetryService extends RuleEngineTelemetryService {
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback); void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback);
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);

View File

@ -36,9 +36,9 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture<List<Integer>> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl); ListenableFuture<List<Integer>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry); ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);