Merge branch 'fix/devices-activity-stats'
This commit is contained in:
		
						commit
						9d217608b2
					
				@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.apiusage;
 | 
			
		||||
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
@ -61,20 +62,25 @@ public abstract class BaseApiUsageState {
 | 
			
		||||
        long currentValue = get(key);
 | 
			
		||||
        long currentHourlyValue = getHourly(key);
 | 
			
		||||
 | 
			
		||||
        long newValue;
 | 
			
		||||
        long newHourlyValue;
 | 
			
		||||
        StatsCalculationResult result;
 | 
			
		||||
        if (key.isCounter()) {
 | 
			
		||||
            newValue = currentValue + value;
 | 
			
		||||
            newHourlyValue = currentHourlyValue + value;
 | 
			
		||||
            result = StatsCalculationResult.builder()
 | 
			
		||||
                    .newValue(currentValue + value).valueChanged(true)
 | 
			
		||||
                    .newHourlyValue(currentHourlyValue + value).hourlyValueChanged(true)
 | 
			
		||||
                    .build();
 | 
			
		||||
        } else {
 | 
			
		||||
            Long newGaugeValue = calculateGauge(key, value, serviceId);
 | 
			
		||||
            newValue = newGaugeValue != null ? newGaugeValue : currentValue;
 | 
			
		||||
            newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue;
 | 
			
		||||
            long newValue = newGaugeValue != null ? newGaugeValue : currentValue;
 | 
			
		||||
            long newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue;
 | 
			
		||||
            result = StatsCalculationResult.builder()
 | 
			
		||||
                    .newValue(newValue).valueChanged(newValue != currentValue || !currentCycleValues.containsKey(key))
 | 
			
		||||
                    .newHourlyValue(newHourlyValue).hourlyValueChanged(newHourlyValue != currentHourlyValue || !currentHourValues.containsKey(key))
 | 
			
		||||
                    .build();
 | 
			
		||||
        }
 | 
			
		||||
        set(key, newValue);
 | 
			
		||||
        setHourly(key, newHourlyValue);
 | 
			
		||||
 | 
			
		||||
        return StatsCalculationResult.of(newValue, newHourlyValue);
 | 
			
		||||
        set(key, result.getNewValue());
 | 
			
		||||
        setHourly(key, result.getNewHourlyValue());
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Long calculateGauge(ApiUsageRecordKey key, long value, String serviceId) {
 | 
			
		||||
@ -188,10 +194,13 @@ public abstract class BaseApiUsageState {
 | 
			
		||||
        return getApiUsageState().getEntityId();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data(staticConstructor = "of")
 | 
			
		||||
    @Data
 | 
			
		||||
    @Builder
 | 
			
		||||
    public static class StatsCalculationResult {
 | 
			
		||||
        private final long newValue;
 | 
			
		||||
        private final boolean valueChanged;
 | 
			
		||||
        private final long newHourlyValue;
 | 
			
		||||
        private final boolean hourlyValueChanged;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -192,11 +192,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
 | 
			
		||||
                ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
 | 
			
		||||
 | 
			
		||||
                StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
 | 
			
		||||
                long newValue = calculationResult.getNewValue();
 | 
			
		||||
                long newHourlyValue = calculationResult.getNewHourlyValue();
 | 
			
		||||
 | 
			
		||||
                updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
 | 
			
		||||
                updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
 | 
			
		||||
                if (calculationResult.isValueChanged()) {
 | 
			
		||||
                    long newValue = calculationResult.getNewValue();
 | 
			
		||||
                    updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
 | 
			
		||||
                }
 | 
			
		||||
                if (calculationResult.isHourlyValueChanged()) {
 | 
			
		||||
                    long newHourlyValue = calculationResult.getNewHourlyValue();
 | 
			
		||||
                    updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
 | 
			
		||||
                }
 | 
			
		||||
                if (recordKey.getApiFeature() != null) {
 | 
			
		||||
                    apiFeatures.add(recordKey.getApiFeature());
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
@ -24,12 +24,14 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.tuple.Pair;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
@ -113,6 +115,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
 | 
			
		||||
@Service
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class DefaultDeviceStateService extends AbstractPartitionBasedService<DeviceId> implements DeviceStateService {
 | 
			
		||||
 | 
			
		||||
    public static final String ACTIVITY_STATE = "active";
 | 
			
		||||
@ -147,13 +150,11 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
            new EntityKey(EntityKeyType.ENTITY_FIELD, "label"),
 | 
			
		||||
            new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime"));
 | 
			
		||||
 | 
			
		||||
    private final TenantService tenantService;
 | 
			
		||||
    private final DeviceService deviceService;
 | 
			
		||||
    private final AttributesService attributesService;
 | 
			
		||||
    private final TimeseriesService tsService;
 | 
			
		||||
    private final TbClusterService clusterService;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
    private final EntityQueryRepository entityQueryRepository;
 | 
			
		||||
    private final DbTypeInfoComponent dbTypeInfoComponent;
 | 
			
		||||
    private final TbApiUsageReportClient apiUsageReportClient;
 | 
			
		||||
@ -174,6 +175,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
    @Getter
 | 
			
		||||
    private int defaultStateCheckIntervalInSec;
 | 
			
		||||
 | 
			
		||||
    @Value("${usage.stats.devices.report_interval:60}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private int defaultActivityStatsIntervalInSec;
 | 
			
		||||
 | 
			
		||||
    @Value("${state.persistToTelemetry:false}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    @Setter
 | 
			
		||||
@ -187,31 +192,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
 | 
			
		||||
    final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
 | 
			
		||||
                                     AttributesService attributesService, TimeseriesService tsService,
 | 
			
		||||
                                     TbClusterService clusterService, PartitionService partitionService,
 | 
			
		||||
                                     TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                     EntityQueryRepository entityQueryRepository,
 | 
			
		||||
                                     DbTypeInfoComponent dbTypeInfoComponent,
 | 
			
		||||
                                     TbApiUsageReportClient apiUsageReportClient) {
 | 
			
		||||
        this.tenantService = tenantService;
 | 
			
		||||
        this.deviceService = deviceService;
 | 
			
		||||
        this.attributesService = attributesService;
 | 
			
		||||
        this.tsService = tsService;
 | 
			
		||||
        this.clusterService = clusterService;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.entityQueryRepository = entityQueryRepository;
 | 
			
		||||
        this.dbTypeInfoComponent = dbTypeInfoComponent;
 | 
			
		||||
        this.apiUsageReportClient = apiUsageReportClient;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        super.init();
 | 
			
		||||
        deviceStateExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
 | 
			
		||||
                Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state"));
 | 
			
		||||
        scheduledExecutor.scheduleAtFixedRate(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
 | 
			
		||||
        scheduledExecutor.scheduleWithFixedDelay(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
 | 
			
		||||
        scheduledExecutor.scheduleWithFixedDelay(this::reportActivityStats, defaultActivityStatsIntervalInSec, defaultActivityStatsIntervalInSec, TimeUnit.SECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
@ -439,7 +426,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
    void checkStates() {
 | 
			
		||||
        try {
 | 
			
		||||
            final long ts = System.currentTimeMillis();
 | 
			
		||||
            Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>();
 | 
			
		||||
            partitionedEntities.forEach((tpi, deviceIds) -> {
 | 
			
		||||
                log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
 | 
			
		||||
                Set<DeviceId> idsFromRemovedTenant = new HashSet<>();
 | 
			
		||||
@ -456,22 +442,32 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
                    } catch (Exception e) {
 | 
			
		||||
                        if (e instanceof TenantNotFoundException) {
 | 
			
		||||
                            idsFromRemovedTenant.add(deviceId);
 | 
			
		||||
                            continue;
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.warn("[{}] Failed to update inactivity state [{}]", deviceId, e.getMessage());
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(),
 | 
			
		||||
                            tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
 | 
			
		||||
                    if (stateData.getState().isActive()) {
 | 
			
		||||
                        tenantDevicesActivity.getLeft().incrementAndGet();
 | 
			
		||||
                    } else {
 | 
			
		||||
                        tenantDevicesActivity.getRight().incrementAndGet();
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                deviceIds.removeAll(idsFromRemovedTenant);
 | 
			
		||||
            });
 | 
			
		||||
            devicesActivity.forEach((tenantId, tenantDevicesActivity) -> {
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.warn("Failed to check devices states", t);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void reportActivityStats() {
 | 
			
		||||
        try{
 | 
			
		||||
            Map<TenantId, Pair<AtomicInteger, AtomicInteger>> stats = new HashMap<>();
 | 
			
		||||
            for (DeviceStateData stateData : deviceStates.values()) {
 | 
			
		||||
                Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(),
 | 
			
		||||
                        tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
 | 
			
		||||
                if (stateData.getState().isActive()) {
 | 
			
		||||
                    tenantDevicesActivity.getLeft().incrementAndGet();
 | 
			
		||||
                } else {
 | 
			
		||||
                    tenantDevicesActivity.getRight().incrementAndGet();
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            stats.forEach((tenantId, tenantDevicesActivity) -> {
 | 
			
		||||
                int active = tenantDevicesActivity.getLeft().get();
 | 
			
		||||
                int inactive = tenantDevicesActivity.getRight().get();
 | 
			
		||||
                apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active);
 | 
			
		||||
@ -481,7 +477,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.warn("Failed to check devices states", t);
 | 
			
		||||
            log.warn("Failed to report activity states", t);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -144,7 +144,11 @@ usage:
 | 
			
		||||
      interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
 | 
			
		||||
    check:
 | 
			
		||||
      cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
 | 
			
		||||
    # In milliseconds. Default value is 3 minutes
 | 
			
		||||
    gauge_report_interval: "${USAGE_STATS_GAUGE_REPORT_INTERVAL:180000}"
 | 
			
		||||
    devices:
 | 
			
		||||
      # In seconds, default value is 1 minutes. When changing, in cluster mode, make sure usage.stats.gauge_report_interval is set to x2-x3 of this value
 | 
			
		||||
      report_interval: "${DEVICES_STATS_REPORT_INTERVAL:60}"
 | 
			
		||||
 | 
			
		||||
# UI parameters
 | 
			
		||||
ui:
 | 
			
		||||
 | 
			
		||||
@ -50,8 +50,6 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.INA
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class DefaultDeviceStateServiceTest {
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    TenantService tenantService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    DeviceService deviceService;
 | 
			
		||||
    @Mock
 | 
			
		||||
@ -64,8 +62,6 @@ public class DefaultDeviceStateServiceTest {
 | 
			
		||||
    PartitionService partitionService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    DeviceStateData deviceStateDataMock;
 | 
			
		||||
    @Mock
 | 
			
		||||
    TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
 | 
			
		||||
    DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
 | 
			
		||||
 | 
			
		||||
@ -73,7 +69,7 @@ public class DefaultDeviceStateServiceTest {
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void setUp() {
 | 
			
		||||
        service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null, null, null));
 | 
			
		||||
        service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, null, null, null));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
 | 
			
		||||
@ -42,8 +42,9 @@ import static org.awaitility.Awaitility.await;
 | 
			
		||||
        "usage.stats.report.enabled=true",
 | 
			
		||||
        "usage.stats.report.interval=2",
 | 
			
		||||
        "usage.stats.gauge_report_interval=1",
 | 
			
		||||
        "usage.stats.devices.report_interval=3",
 | 
			
		||||
        "state.defaultStateCheckIntervalInSec=3",
 | 
			
		||||
        "state.defaultInactivityTimeoutInSec=10"
 | 
			
		||||
        "state.defaultInactivityTimeoutInSec=10",
 | 
			
		||||
})
 | 
			
		||||
public class DevicesStatisticsTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
@ -82,7 +83,9 @@ public class DevicesStatisticsTest extends AbstractControllerTest {
 | 
			
		||||
        await().atMost(15, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isZero();
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isZero();
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount + inactiveDevicesCount);
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount + inactiveDevicesCount);
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
        for (Device device : activeDevices) {
 | 
			
		||||
@ -92,6 +95,7 @@ public class DevicesStatisticsTest extends AbstractControllerTest {
 | 
			
		||||
        await().atMost(40, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount);
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount);
 | 
			
		||||
                    assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(inactiveDevicesCount);
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user