diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index f7e00b7242..88868d2258 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -71,9 +71,9 @@ import java.util.concurrent.locks.ReentrantLock; public class DefaultTbApiUsageStateService implements TbApiUsageStateService { public static final String HOURLY = "Hourly"; - public static final FutureCallback VOID_CALLBACK = new FutureCallback() { + public static final FutureCallback VOID_CALLBACK = new FutureCallback() { @Override - public void onSuccess(@Nullable Void result) {} + public void onSuccess(@Nullable Integer result) {} @Override public void onFailure(Throwable t) {} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java index bc4099e81b..4178502c77 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java @@ -105,10 +105,10 @@ public abstract class AbstractSubscriptionService implements ApplicationListener } } - protected void addWsCallback(ListenableFuture> saveFuture, Consumer callback) { - Futures.addCallback(saveFuture, new FutureCallback>() { + protected void addWsCallback(ListenableFuture> saveFuture, Consumer callback) { + Futures.addCallback(saveFuture, new FutureCallback>() { @Override - public void onSuccess(@Nullable List result) { + public void onSuccess(@Nullable List result) { callback.accept(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d4786a6f54..53d0e08da2 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,10 +20,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.EntityId; @@ -42,10 +41,9 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.queue.TbClusterService; -import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import javax.annotation.Nullable; @@ -59,12 +57,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; -import java.util.stream.Collectors; /** * Created by ashvayka on 27.03.18. @@ -76,6 +70,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private final AttributesService attrService; private final TimeseriesService tsService; private final EntityViewService entityViewService; + private final TbApiUsageClient apiUsageClient; private ExecutorService tsCallBackExecutor; @@ -83,11 +78,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer TimeseriesService tsService, EntityViewService entityViewService, TbClusterService clusterService, - PartitionService partitionService) { + PartitionService partitionService, + TbApiUsageClient apiUsageClient) { super(clusterService, partitionService); this.attrService = attrService; this.tsService = tsService; this.entityViewService = entityViewService; + this.apiUsageClient = apiUsageClient; } @PostConstruct @@ -117,12 +114,25 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { checkInternalEntity(entityId); - saveAndNotifyInternal(tenantId, entityId, ts, ttl, callback); + saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback() { + @Override + public void onSuccess(Integer result) { + if (result != null && result > 0) { + apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + } + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } @Override - public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { - ListenableFuture> saveFuture = tsService.save(tenantId, entityId, ts, ttl); + public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + ListenableFuture> saveFuture = tsService.save(tenantId, entityId, ts, ttl); addMainCallback(saveFuture, callback); addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { @@ -147,9 +157,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer Optional tsKvEntry = entries.stream() .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) .max(Comparator.comparingLong(TsKvEntry::getTs)); - if (tsKvEntry.isPresent()) { - entityViewLatest.add(tsKvEntry.get()); - } + tsKvEntry.ifPresent(entityViewLatest::add); } } if (!entityViewLatest.isEmpty()) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 275630a161..53cc23f23a 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -29,7 +29,7 @@ import java.util.List; */ public interface InternalTelemetryService extends RuleEngineTelemetryService { - void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback); + void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback); void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, FutureCallback callback); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 2a5a701f9e..81dedff132 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -36,9 +36,9 @@ public interface TimeseriesService { ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId); - ListenableFuture> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); + ListenableFuture> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); - ListenableFuture> save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture> save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry);