Performance improvements for usage stats
This commit is contained in:
parent
ccd9af264c
commit
5f7ff55f01
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.apiusage;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
@ -61,20 +62,25 @@ public abstract class BaseApiUsageState {
|
||||
long currentValue = get(key);
|
||||
long currentHourlyValue = getHourly(key);
|
||||
|
||||
long newValue;
|
||||
long newHourlyValue;
|
||||
StatsCalculationResult result;
|
||||
if (key.isCounter()) {
|
||||
newValue = currentValue + value;
|
||||
newHourlyValue = currentHourlyValue + value;
|
||||
result = StatsCalculationResult.builder()
|
||||
.newValue(currentValue + value).valueChanged(true)
|
||||
.newHourlyValue(currentHourlyValue + value).hourlyValueChanged(true)
|
||||
.build();
|
||||
} else {
|
||||
Long newGaugeValue = calculateGauge(key, value, serviceId);
|
||||
newValue = newGaugeValue != null ? newGaugeValue : currentValue;
|
||||
newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue;
|
||||
long newValue = newGaugeValue != null ? newGaugeValue : currentValue;
|
||||
long newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue;
|
||||
result = StatsCalculationResult.builder()
|
||||
.newValue(newValue).valueChanged(newValue != currentValue || !currentCycleValues.containsKey(key))
|
||||
.newHourlyValue(newHourlyValue).hourlyValueChanged(newHourlyValue != currentHourlyValue || !currentHourValues.containsKey(key))
|
||||
.build();
|
||||
}
|
||||
set(key, newValue);
|
||||
setHourly(key, newHourlyValue);
|
||||
|
||||
return StatsCalculationResult.of(newValue, newHourlyValue);
|
||||
set(key, result.getNewValue());
|
||||
setHourly(key, result.getNewHourlyValue());
|
||||
return result;
|
||||
}
|
||||
|
||||
private Long calculateGauge(ApiUsageRecordKey key, long value, String serviceId) {
|
||||
@ -188,10 +194,13 @@ public abstract class BaseApiUsageState {
|
||||
return getApiUsageState().getEntityId();
|
||||
}
|
||||
|
||||
@Data(staticConstructor = "of")
|
||||
@Data
|
||||
@Builder
|
||||
public static class StatsCalculationResult {
|
||||
private final long newValue;
|
||||
private final boolean valueChanged;
|
||||
private final long newHourlyValue;
|
||||
private final boolean hourlyValueChanged;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
@Value("${usage.stats.check.cycle:60000}")
|
||||
private long nextCycleCheckInterval;
|
||||
|
||||
@Value("${usage.stats.gauge_report_interval:180000}")
|
||||
@Value("${usage.stats.gauge_report_interval:420000}")
|
||||
private long gaugeReportInterval;
|
||||
|
||||
private final Lock updateLock = new ReentrantLock();
|
||||
@ -192,11 +192,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
|
||||
|
||||
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
|
||||
long newValue = calculationResult.getNewValue();
|
||||
long newHourlyValue = calculationResult.getNewHourlyValue();
|
||||
|
||||
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
|
||||
updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
|
||||
if (calculationResult.isValueChanged()) {
|
||||
long newValue = calculationResult.getNewValue();
|
||||
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
|
||||
}
|
||||
if (calculationResult.isHourlyValueChanged()) {
|
||||
long newHourlyValue = calculationResult.getNewHourlyValue();
|
||||
updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
|
||||
}
|
||||
if (recordKey.getApiFeature() != null) {
|
||||
apiFeatures.add(recordKey.getApiFeature());
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
@ -439,7 +440,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
void checkStates() {
|
||||
try {
|
||||
final long ts = System.currentTimeMillis();
|
||||
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>();
|
||||
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||
Set<DeviceId> idsFromRemovedTenant = new HashSet<>();
|
||||
@ -456,35 +456,43 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
} catch (Exception e) {
|
||||
if (e instanceof TenantNotFoundException) {
|
||||
idsFromRemovedTenant.add(deviceId);
|
||||
continue;
|
||||
} else {
|
||||
log.warn("[{}] Failed to update inactivity state [{}]", deviceId, e.getMessage());
|
||||
}
|
||||
}
|
||||
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(),
|
||||
tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
|
||||
if (stateData.getState().isActive()) {
|
||||
tenantDevicesActivity.getLeft().incrementAndGet();
|
||||
} else {
|
||||
tenantDevicesActivity.getRight().incrementAndGet();
|
||||
}
|
||||
}
|
||||
deviceIds.removeAll(idsFromRemovedTenant);
|
||||
});
|
||||
devicesActivity.forEach((tenantId, tenantDevicesActivity) -> {
|
||||
int active = tenantDevicesActivity.getLeft().get();
|
||||
int inactive = tenantDevicesActivity.getRight().get();
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active);
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.INACTIVE_DEVICES, inactive);
|
||||
if (active > 0) {
|
||||
log.info("[{}] Active devices: {}, inactive devices: {}", tenantId, active, inactive);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
log.warn("Failed to check devices states", t);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(initialDelayString = "${usage.stats.devices.report_interval:180}",
|
||||
fixedDelayString = "${usage.stats.devices.report_interval:180}", timeUnit = TimeUnit.SECONDS)
|
||||
public void reportActivityStats() {
|
||||
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> stats = new HashMap<>();
|
||||
for (DeviceStateData stateData : deviceStates.values()) {
|
||||
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(),
|
||||
tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
|
||||
if (stateData.getState().isActive()) {
|
||||
tenantDevicesActivity.getLeft().incrementAndGet();
|
||||
} else {
|
||||
tenantDevicesActivity.getRight().incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
stats.forEach((tenantId, tenantDevicesActivity) -> {
|
||||
int active = tenantDevicesActivity.getLeft().get();
|
||||
int inactive = tenantDevicesActivity.getRight().get();
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active);
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.INACTIVE_DEVICES, inactive);
|
||||
if (active > 0) {
|
||||
log.info("[{}] Active devices: {}, inactive devices: {}", tenantId, active, inactive);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired(long ts, DeviceId deviceId, DeviceStateData stateData) {
|
||||
log.trace("Processing state {} for device {}", stateData, deviceId);
|
||||
if (stateData != null) {
|
||||
|
||||
@ -144,7 +144,11 @@ usage:
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
gauge_report_interval: "${USAGE_STATS_GAUGE_REPORT_INTERVAL:180000}"
|
||||
# In milliseconds. Default value is 7 minutes
|
||||
gauge_report_interval: "${USAGE_STATS_GAUGE_REPORT_INTERVAL:420000}"
|
||||
devices:
|
||||
# In seconds, default value is 3 minutes. When changing, in cluster mode, make sure usage.stats.gauge_report_interval is set to x2-x3 of this value
|
||||
report_interval: "${DEVICES_STATS_REPORT_INTERVAL:180}"
|
||||
|
||||
# UI parameters
|
||||
ui:
|
||||
|
||||
@ -42,8 +42,9 @@ import static org.awaitility.Awaitility.await;
|
||||
"usage.stats.report.enabled=true",
|
||||
"usage.stats.report.interval=2",
|
||||
"usage.stats.gauge_report_interval=1",
|
||||
"usage.stats.devices.report_interval=3",
|
||||
"state.defaultStateCheckIntervalInSec=3",
|
||||
"state.defaultInactivityTimeoutInSec=10"
|
||||
"state.defaultInactivityTimeoutInSec=10",
|
||||
})
|
||||
public class DevicesStatisticsTest extends AbstractControllerTest {
|
||||
|
||||
@ -82,7 +83,9 @@ public class DevicesStatisticsTest extends AbstractControllerTest {
|
||||
await().atMost(15, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isZero();
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isZero();
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount + inactiveDevicesCount);
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount + inactiveDevicesCount);
|
||||
});
|
||||
|
||||
for (Device device : activeDevices) {
|
||||
@ -92,6 +95,7 @@ public class DevicesStatisticsTest extends AbstractControllerTest {
|
||||
await().atMost(40, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount);
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, true)).isEqualTo(activeDevicesCount);
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(inactiveDevicesCount);
|
||||
});
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user