Refactoring for devices activity stats
This commit is contained in:
parent
932cbc6e9f
commit
a58dff1ffc
@ -17,6 +17,7 @@ package org.thingsboard.server.service.apiusage;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.thingsboard.server.common.data.ApiFeature;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.ApiUsageState;
|
||||
@ -29,7 +30,6 @@ import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class BaseApiUsageState {
|
||||
private final Map<ApiUsageRecordKey, Long> currentCycleValues = new ConcurrentHashMap<>();
|
||||
@ -37,7 +37,6 @@ public abstract class BaseApiUsageState {
|
||||
|
||||
private final Map<ApiUsageRecordKey, Map<String, Long>> lastGaugesByServiceId = new HashMap<>();
|
||||
private final Map<ApiUsageRecordKey, Long> gaugesReportCycles = new HashMap<>();
|
||||
private static long gaugeReportInterval = TimeUnit.MINUTES.toMillis(3);
|
||||
|
||||
@Getter
|
||||
private final ApiUsageState apiUsageState;
|
||||
@ -48,6 +47,9 @@ public abstract class BaseApiUsageState {
|
||||
@Getter
|
||||
private volatile long currentHourTs;
|
||||
|
||||
@Setter
|
||||
private long gaugeReportInterval;
|
||||
|
||||
public BaseApiUsageState(ApiUsageState apiUsageState) {
|
||||
this.apiUsageState = apiUsageState;
|
||||
this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
|
||||
|
||||
@ -48,12 +48,12 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
@ -129,6 +129,9 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
@Value("${usage.stats.check.cycle:60000}")
|
||||
private long nextCycleCheckInterval;
|
||||
|
||||
@Value("${usage.stats.gauge_report_interval:180000}")
|
||||
private long gaugeReportInterval;
|
||||
|
||||
private final Lock updateLock = new ReentrantLock();
|
||||
|
||||
@PostConstruct
|
||||
@ -476,6 +479,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
}
|
||||
}
|
||||
}
|
||||
state.setGaugeReportInterval(gaugeReportInterval);
|
||||
log.debug("[{}] Initialized state: {}", ownerId, storedState);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, ownerId);
|
||||
if (tpi.isMyPartition()) {
|
||||
|
||||
@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
@ -437,7 +438,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
void checkStates() {
|
||||
try {
|
||||
final long ts = System.currentTimeMillis();
|
||||
Map<TenantId, Map<Boolean, AtomicInteger>> devicesActivity = new HashMap<>();
|
||||
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>();
|
||||
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||
for (DeviceId deviceId : deviceIds) {
|
||||
@ -453,14 +454,18 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to update inactivity state", deviceId, e);
|
||||
}
|
||||
devicesActivity.computeIfAbsent(stateData.getTenantId(), k -> new HashMap<>())
|
||||
.computeIfAbsent(stateData.getState().isActive(), k -> new AtomicInteger())
|
||||
.incrementAndGet();
|
||||
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(),
|
||||
tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
|
||||
if (stateData.getState().isActive()) {
|
||||
tenantDevicesActivity.getLeft().incrementAndGet();
|
||||
} else {
|
||||
tenantDevicesActivity.getRight().incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
devicesActivity.forEach((tenantId, countByActivityStatus) -> {
|
||||
int active = Optional.ofNullable(countByActivityStatus.get(true)).map(AtomicInteger::get).orElse(0);
|
||||
int inactive = Optional.ofNullable(countByActivityStatus.get(false)).map(AtomicInteger::get).orElse(0);
|
||||
devicesActivity.forEach((tenantId, tenantDevicesActivity) -> {
|
||||
int active = tenantDevicesActivity.getLeft().get();
|
||||
int inactive = tenantDevicesActivity.getRight().get();
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active);
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.INACTIVE_DEVICES, inactive);
|
||||
if (active > 0) {
|
||||
|
||||
@ -140,6 +140,7 @@ usage:
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
gauge_report_interval: "${USAGE_STATS_GAUGE_REPORT_INTERVAL:180000}"
|
||||
|
||||
# UI parameters
|
||||
ui:
|
||||
|
||||
@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.service.apiusage.BaseApiUsageState;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -43,6 +42,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
|
||||
"usage.stats.report.enabled=true",
|
||||
"transport.http.enabled=true",
|
||||
"usage.stats.report.interval=2",
|
||||
"usage.stats.gauge_report_interval=1",
|
||||
"state.defaultStateCheckIntervalInSec=3",
|
||||
"state.defaultInactivityTimeoutInSec=10"
|
||||
|
||||
@ -64,8 +64,6 @@ public class DevicesStatisticsTest extends AbstractControllerTest {
|
||||
|
||||
@Test
|
||||
public void testDevicesActivityStats() throws Exception {
|
||||
setStaticFieldValue(BaseApiUsageState.class, "gaugeReportInterval", 1);
|
||||
|
||||
int activeDevicesCount = 5;
|
||||
List<Device> activeDevices = new ArrayList<>();
|
||||
for (int i = 1; i <= activeDevicesCount; i++) {
|
||||
|
||||
@ -174,8 +174,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -277,8 +277,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -262,8 +262,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -344,8 +344,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -292,8 +292,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -242,8 +242,6 @@ usage:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user