diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java index db8247bb93..1215bef0bf 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.apiusage; +import lombok.Builder; import lombok.Data; import lombok.Getter; import lombok.Setter; @@ -61,20 +62,25 @@ public abstract class BaseApiUsageState { long currentValue = get(key); long currentHourlyValue = getHourly(key); - long newValue; - long newHourlyValue; + StatsCalculationResult result; if (key.isCounter()) { - newValue = currentValue + value; - newHourlyValue = currentHourlyValue + value; + result = StatsCalculationResult.builder() + .newValue(currentValue + value).valueChanged(true) + .newHourlyValue(currentHourlyValue + value).hourlyValueChanged(true) + .build(); } else { Long newGaugeValue = calculateGauge(key, value, serviceId); - newValue = newGaugeValue != null ? newGaugeValue : currentValue; - newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue; + long newValue = newGaugeValue != null ? newGaugeValue : currentValue; + long newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue; + result = StatsCalculationResult.builder() + .newValue(newValue).valueChanged(newValue != currentValue || !currentCycleValues.containsKey(key)) + .newHourlyValue(newHourlyValue).hourlyValueChanged(newHourlyValue != currentHourlyValue || !currentHourValues.containsKey(key)) + .build(); } - set(key, newValue); - setHourly(key, newHourlyValue); - return StatsCalculationResult.of(newValue, newHourlyValue); + set(key, result.getNewValue()); + setHourly(key, result.getNewHourlyValue()); + return result; } private Long calculateGauge(ApiUsageRecordKey key, long value, String serviceId) { @@ -188,10 +194,13 @@ public abstract class BaseApiUsageState { return getApiUsageState().getEntityId(); } - @Data(staticConstructor = "of") + @Data + @Builder public static class StatsCalculationResult { private final long newValue; + private final boolean valueChanged; private final long newHourlyValue; + private final boolean hourlyValueChanged; } } diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 3024040537..9e17641c69 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -192,11 +192,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey()); StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId); - long newValue = calculationResult.getNewValue(); - long newHourlyValue = calculationResult.getNewHourlyValue(); - - updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue))); - updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); + if (calculationResult.isValueChanged()) { + long newValue = calculationResult.getNewValue(); + updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue))); + } + if (calculationResult.isHourlyValueChanged()) { + long newHourlyValue = calculationResult.getNewHourlyValue(); + updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); + } if (recordKey.getApiFeature() != null) { apiFeatures.add(recordKey.getApiFeature()); } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 8640ee82a4..9d928e622a 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -24,12 +24,14 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; @@ -113,6 +115,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; @Service @TbCoreComponent @Slf4j +@RequiredArgsConstructor public class DefaultDeviceStateService extends AbstractPartitionBasedService implements DeviceStateService { public static final String ACTIVITY_STATE = "active"; @@ -147,13 +150,11 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceStates = new ConcurrentHashMap<>(); - public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService, - AttributesService attributesService, TimeseriesService tsService, - TbClusterService clusterService, PartitionService partitionService, - TbServiceInfoProvider serviceInfoProvider, - EntityQueryRepository entityQueryRepository, - DbTypeInfoComponent dbTypeInfoComponent, - TbApiUsageReportClient apiUsageReportClient) { - this.tenantService = tenantService; - this.deviceService = deviceService; - this.attributesService = attributesService; - this.tsService = tsService; - this.clusterService = clusterService; - this.partitionService = partitionService; - this.serviceInfoProvider = serviceInfoProvider; - this.entityQueryRepository = entityQueryRepository; - this.dbTypeInfoComponent = dbTypeInfoComponent; - this.apiUsageReportClient = apiUsageReportClient; - } - @PostConstruct public void init() { super.init(); deviceStateExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state")); - scheduledExecutor.scheduleAtFixedRate(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS); + scheduledExecutor.scheduleWithFixedDelay(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS); + scheduledExecutor.scheduleWithFixedDelay(this::reportActivityStats, defaultActivityStatsIntervalInSec, defaultActivityStatsIntervalInSec, TimeUnit.SECONDS); } @PreDestroy @@ -439,7 +426,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService> devicesActivity = new HashMap<>(); partitionedEntities.forEach((tpi, deviceIds) -> { log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); Set idsFromRemovedTenant = new HashSet<>(); @@ -456,22 +442,32 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(), - tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger())); - if (stateData.getState().isActive()) { - tenantDevicesActivity.getLeft().incrementAndGet(); - } else { - tenantDevicesActivity.getRight().incrementAndGet(); - } } deviceIds.removeAll(idsFromRemovedTenant); }); - devicesActivity.forEach((tenantId, tenantDevicesActivity) -> { + } catch (Throwable t) { + log.warn("Failed to check devices states", t); + } + } + + void reportActivityStats() { + try{ + Map> stats = new HashMap<>(); + for (DeviceStateData stateData : deviceStates.values()) { + Pair tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(), + tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger())); + if (stateData.getState().isActive()) { + tenantDevicesActivity.getLeft().incrementAndGet(); + } else { + tenantDevicesActivity.getRight().incrementAndGet(); + } + } + + stats.forEach((tenantId, tenantDevicesActivity) -> { int active = tenantDevicesActivity.getLeft().get(); int inactive = tenantDevicesActivity.getRight().get(); apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active); @@ -481,7 +477,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService { assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isZero(); + assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isZero(); assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount + inactiveDevicesCount); + assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount + inactiveDevicesCount); }); for (Device device : activeDevices) { @@ -92,6 +95,7 @@ public class DevicesStatisticsTest extends AbstractControllerTest { await().atMost(40, TimeUnit.SECONDS) .untilAsserted(() -> { assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount); + assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount); assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(inactiveDevicesCount); }); }