Merge pull request #8335 from thingsboard/feature/devices-activity
Devices activity statistics
This commit is contained in:
commit
c683b57cba
@ -15,8 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.apiusage;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.util.Pair;
|
||||
import lombok.Setter;
|
||||
import org.thingsboard.server.common.data.ApiFeature;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.ApiUsageState;
|
||||
@ -26,17 +27,17 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public abstract class BaseApiUsageState {
|
||||
private final Map<ApiUsageRecordKey, Long> currentCycleValues = new ConcurrentHashMap<>();
|
||||
private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<ApiUsageRecordKey, Map<String, Long>> lastGaugesByServiceId = new HashMap<>();
|
||||
private final Map<ApiUsageRecordKey, Long> gaugesReportCycles = new HashMap<>();
|
||||
|
||||
@Getter
|
||||
private final ApiUsageState apiUsageState;
|
||||
@Getter
|
||||
@ -46,6 +47,9 @@ public abstract class BaseApiUsageState {
|
||||
@Getter
|
||||
private volatile long currentHourTs;
|
||||
|
||||
@Setter
|
||||
private long gaugeReportInterval;
|
||||
|
||||
public BaseApiUsageState(ApiUsageState apiUsageState) {
|
||||
this.apiUsageState = apiUsageState;
|
||||
this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
|
||||
@ -53,43 +57,76 @@ public abstract class BaseApiUsageState {
|
||||
this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
|
||||
}
|
||||
|
||||
public void put(ApiUsageRecordKey key, Long value) {
|
||||
public StatsCalculationResult calculate(ApiUsageRecordKey key, long value, String serviceId) {
|
||||
long currentValue = get(key);
|
||||
long currentHourlyValue = getHourly(key);
|
||||
|
||||
long newValue;
|
||||
long newHourlyValue;
|
||||
if (key.isCounter()) {
|
||||
newValue = currentValue + value;
|
||||
newHourlyValue = currentHourlyValue + value;
|
||||
} else {
|
||||
Long newGaugeValue = calculateGauge(key, value, serviceId);
|
||||
newValue = newGaugeValue != null ? newGaugeValue : currentValue;
|
||||
newHourlyValue = newGaugeValue != null ? Math.max(newGaugeValue, currentHourlyValue) : currentHourlyValue;
|
||||
}
|
||||
set(key, newValue);
|
||||
setHourly(key, newHourlyValue);
|
||||
|
||||
return StatsCalculationResult.of(newValue, newHourlyValue);
|
||||
}
|
||||
|
||||
private Long calculateGauge(ApiUsageRecordKey key, long value, String serviceId) {
|
||||
Map<String, Long> lastByServiceId = lastGaugesByServiceId.computeIfAbsent(key, k -> {
|
||||
gaugesReportCycles.put(key, System.currentTimeMillis());
|
||||
return new HashMap<>();
|
||||
});
|
||||
lastByServiceId.put(serviceId, value);
|
||||
|
||||
Long gaugeReportCycle = gaugesReportCycles.get(key);
|
||||
if (gaugeReportCycle <= System.currentTimeMillis() - gaugeReportInterval) {
|
||||
long newValue = lastByServiceId.values().stream().mapToLong(Long::longValue).sum();
|
||||
lastGaugesByServiceId.remove(key);
|
||||
gaugesReportCycles.remove(key);
|
||||
return newValue;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void set(ApiUsageRecordKey key, Long value) {
|
||||
currentCycleValues.put(key, value);
|
||||
}
|
||||
|
||||
public void putHourly(ApiUsageRecordKey key, Long value) {
|
||||
currentHourValues.put(key, value);
|
||||
}
|
||||
|
||||
public long add(ApiUsageRecordKey key, long value) {
|
||||
long result = currentCycleValues.getOrDefault(key, 0L) + value;
|
||||
currentCycleValues.put(key, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public long get(ApiUsageRecordKey key) {
|
||||
return currentCycleValues.getOrDefault(key, 0L);
|
||||
}
|
||||
|
||||
public long addToHourly(ApiUsageRecordKey key, long value) {
|
||||
long result = currentHourValues.getOrDefault(key, 0L) + value;
|
||||
currentHourValues.put(key, result);
|
||||
return result;
|
||||
public void setHourly(ApiUsageRecordKey key, Long value) {
|
||||
currentHourValues.put(key, value);
|
||||
}
|
||||
|
||||
public long getHourly(ApiUsageRecordKey key) {
|
||||
return currentHourValues.getOrDefault(key, 0L);
|
||||
}
|
||||
|
||||
public void setHour(long currentHourTs) {
|
||||
this.currentHourTs = currentHourTs;
|
||||
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
||||
currentHourValues.put(key, 0L);
|
||||
}
|
||||
currentHourValues.clear();
|
||||
lastGaugesByServiceId.clear();
|
||||
gaugesReportCycles.clear();
|
||||
}
|
||||
|
||||
public void setCycles(long currentCycleTs, long nextCycleTs) {
|
||||
this.currentCycleTs = currentCycleTs;
|
||||
this.nextCycleTs = nextCycleTs;
|
||||
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
||||
currentCycleValues.put(key, 0L);
|
||||
}
|
||||
currentCycleValues.clear();
|
||||
}
|
||||
|
||||
public void onRepartitionEvent() {
|
||||
lastGaugesByServiceId.clear();
|
||||
gaugesReportCycles.clear();
|
||||
}
|
||||
|
||||
public ApiUsageStateValue getFeatureValue(ApiFeature feature) {
|
||||
@ -150,4 +187,11 @@ public abstract class BaseApiUsageState {
|
||||
public EntityId getEntityId() {
|
||||
return getApiUsageState().getEntityId();
|
||||
}
|
||||
|
||||
@Data(staticConstructor = "of")
|
||||
public static class StatsCalculationResult {
|
||||
private final long newValue;
|
||||
private final long newHourlyValue;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -48,12 +48,12 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
@ -62,6 +62,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.service.apiusage.BaseApiUsageState.StatsCalculationResult;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.mail.MailExecutorService;
|
||||
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
|
||||
@ -128,6 +129,9 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
@Value("${usage.stats.check.cycle:60000}")
|
||||
private long nextCycleCheckInterval;
|
||||
|
||||
@Value("${usage.stats.gauge_report_interval:180000}")
|
||||
private long gaugeReportInterval;
|
||||
|
||||
private final Lock updateLock = new ReentrantLock();
|
||||
|
||||
@PostConstruct
|
||||
@ -155,19 +159,19 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
ToUsageStatsServiceMsg statsMsg = msg.getValue();
|
||||
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
|
||||
EntityId entityId;
|
||||
EntityId ownerId;
|
||||
if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) {
|
||||
entityId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB()));
|
||||
ownerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB()));
|
||||
} else {
|
||||
entityId = tenantId;
|
||||
ownerId = tenantId;
|
||||
}
|
||||
|
||||
processEntityUsageStats(tenantId, entityId, statsMsg.getValuesList());
|
||||
processEntityUsageStats(tenantId, ownerId, statsMsg.getValuesList(), statsMsg.getServiceId());
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List<UsageStatsKVProto> values) {
|
||||
if (deletedEntities.contains(entityId)) return;
|
||||
private void processEntityUsageStats(TenantId tenantId, EntityId ownerId, List<UsageStatsKVProto> values, String serviceId) {
|
||||
if (deletedEntities.contains(ownerId)) return;
|
||||
|
||||
BaseApiUsageState usageState;
|
||||
List<TsKvEntry> updatedEntries;
|
||||
@ -175,7 +179,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
|
||||
updateLock.lock();
|
||||
try {
|
||||
usageState = getOrFetchState(tenantId, entityId);
|
||||
usageState = getOrFetchState(tenantId, ownerId);
|
||||
long ts = usageState.getCurrentCycleTs();
|
||||
long hourTs = usageState.getCurrentHourTs();
|
||||
long newHourTs = SchedulerUtils.getStartOfCurrentHour();
|
||||
@ -184,13 +188,18 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
}
|
||||
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
|
||||
Set<ApiFeature> apiFeatures = new HashSet<>();
|
||||
for (UsageStatsKVProto kvProto : values) {
|
||||
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
|
||||
long newValue = usageState.add(recordKey, kvProto.getValue());
|
||||
for (UsageStatsKVProto statsItem : values) {
|
||||
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
|
||||
|
||||
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
|
||||
long newValue = calculationResult.getNewValue();
|
||||
long newHourlyValue = calculationResult.getNewHourlyValue();
|
||||
|
||||
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
|
||||
long newHourlyValue = usageState.addToHourly(recordKey, kvProto.getValue());
|
||||
updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
|
||||
apiFeatures.add(recordKey.getApiFeature());
|
||||
if (recordKey.getApiFeature() != null) {
|
||||
apiFeatures.add(recordKey.getApiFeature());
|
||||
}
|
||||
}
|
||||
if (usageState.getEntityType() == EntityType.TENANT && !usageState.getEntityId().equals(TenantId.SYS_TENANT_ID)) {
|
||||
result = ((TenantApiUsageState) usageState).checkStateUpdatedDueToThreshold(apiFeatures);
|
||||
@ -418,26 +427,26 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK);
|
||||
}
|
||||
|
||||
BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) {
|
||||
if (entityId == null || entityId.isNullUid()) {
|
||||
entityId = tenantId;
|
||||
BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId ownerId) {
|
||||
if (ownerId == null || ownerId.isNullUid()) {
|
||||
ownerId = tenantId;
|
||||
}
|
||||
BaseApiUsageState state = myUsageStates.get(entityId);
|
||||
BaseApiUsageState state = myUsageStates.get(ownerId);
|
||||
if (state != null) {
|
||||
return state;
|
||||
}
|
||||
|
||||
ApiUsageState storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId);
|
||||
ApiUsageState storedState = apiUsageStateService.findApiUsageStateByEntityId(ownerId);
|
||||
if (storedState == null) {
|
||||
try {
|
||||
storedState = apiUsageStateService.createDefaultApiUsageState(tenantId, entityId);
|
||||
storedState = apiUsageStateService.createDefaultApiUsageState(tenantId, ownerId);
|
||||
} catch (Exception e) {
|
||||
storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId);
|
||||
storedState = apiUsageStateService.findApiUsageStateByEntityId(ownerId);
|
||||
}
|
||||
}
|
||||
if (entityId.getEntityType() == EntityType.TENANT) {
|
||||
if (!entityId.equals(TenantId.SYS_TENANT_ID)) {
|
||||
state = new TenantApiUsageState(tenantProfileCache.get((TenantId) entityId), storedState);
|
||||
if (ownerId.getEntityType() == EntityType.TENANT) {
|
||||
if (!ownerId.equals(TenantId.SYS_TENANT_ID)) {
|
||||
state = new TenantApiUsageState(tenantProfileCache.get((TenantId) ownerId), storedState);
|
||||
} else {
|
||||
state = new TenantApiUsageState(storedState);
|
||||
}
|
||||
@ -456,26 +465,27 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
cycleEntryFound = true;
|
||||
|
||||
boolean oldCount = tsKvEntry.getTs() == state.getCurrentCycleTs();
|
||||
state.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L);
|
||||
state.set(key, oldCount ? tsKvEntry.getLongValue().get() : 0L);
|
||||
|
||||
if (!oldCount) {
|
||||
newCounts.add(key);
|
||||
}
|
||||
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) {
|
||||
hourlyEntryFound = true;
|
||||
state.putHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L);
|
||||
state.setHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L);
|
||||
}
|
||||
if (cycleEntryFound && hourlyEntryFound) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
log.debug("[{}] Initialized state: {}", entityId, storedState);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
||||
state.setGaugeReportInterval(gaugeReportInterval);
|
||||
log.debug("[{}] Initialized state: {}", ownerId, storedState);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, ownerId);
|
||||
if (tpi.isMyPartition()) {
|
||||
addEntityState(tpi, state);
|
||||
} else {
|
||||
otherUsageStates.put(entityId, state.getApiUsageState());
|
||||
otherUsageStates.put(ownerId, state.getApiUsageState());
|
||||
}
|
||||
saveNewCounts(state, newCounts);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
@ -489,6 +499,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
protected void onRepartitionEvent() {
|
||||
otherUsageStates.entrySet().removeIf(entry ->
|
||||
partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition());
|
||||
updateLock.lock();
|
||||
try {
|
||||
myUsageStates.values().forEach(BaseApiUsageState::onRepartitionEvent);
|
||||
} finally {
|
||||
updateLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
@ -33,6 +34,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
@ -61,6 +63,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
||||
@ -85,6 +88,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
@ -151,6 +155,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
private final EntityQueryRepository entityQueryRepository;
|
||||
private final DbTypeInfoComponent dbTypeInfoComponent;
|
||||
private final TbApiUsageReportClient apiUsageReportClient;
|
||||
@Autowired @Lazy
|
||||
private TelemetrySubscriptionService tsSubService;
|
||||
|
||||
@ -186,7 +191,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
TbClusterService clusterService, PartitionService partitionService,
|
||||
TbServiceInfoProvider serviceInfoProvider,
|
||||
EntityQueryRepository entityQueryRepository,
|
||||
DbTypeInfoComponent dbTypeInfoComponent) {
|
||||
DbTypeInfoComponent dbTypeInfoComponent,
|
||||
TbApiUsageReportClient apiUsageReportClient) {
|
||||
this.tenantService = tenantService;
|
||||
this.deviceService = deviceService;
|
||||
this.attributesService = attributesService;
|
||||
@ -196,6 +202,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
this.serviceInfoProvider = serviceInfoProvider;
|
||||
this.entityQueryRepository = entityQueryRepository;
|
||||
this.dbTypeInfoComponent = dbTypeInfoComponent;
|
||||
this.apiUsageReportClient = apiUsageReportClient;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@ -203,7 +210,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
super.init();
|
||||
deviceStateExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
|
||||
Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state"));
|
||||
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||
scheduledExecutor.scheduleAtFixedRate(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -428,29 +435,48 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
}
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired() {
|
||||
void checkStates() {
|
||||
try {
|
||||
final long ts = System.currentTimeMillis();
|
||||
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>();
|
||||
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||
for (DeviceId deviceId : deviceIds) {
|
||||
DeviceStateData stateData;
|
||||
try {
|
||||
updateInactivityStateIfExpired(ts, deviceId);
|
||||
stateData = getOrFetchDeviceStateData(deviceId);
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Failed to get or fetch device state data", deviceId, e);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
updateInactivityStateIfExpired(ts, deviceId, stateData);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to update inactivity state", deviceId, e);
|
||||
}
|
||||
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(),
|
||||
tenantId -> Pair.of(new AtomicInteger(), new AtomicInteger()));
|
||||
if (stateData.getState().isActive()) {
|
||||
tenantDevicesActivity.getLeft().incrementAndGet();
|
||||
} else {
|
||||
tenantDevicesActivity.getRight().incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
devicesActivity.forEach((tenantId, tenantDevicesActivity) -> {
|
||||
int active = tenantDevicesActivity.getLeft().get();
|
||||
int inactive = tenantDevicesActivity.getRight().get();
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.ACTIVE_DEVICES, active);
|
||||
apiUsageReportClient.report(tenantId, null, ApiUsageRecordKey.INACTIVE_DEVICES, inactive);
|
||||
if (active > 0) {
|
||||
log.info("[{}] Active devices: {}, inactive devices: {}", tenantId, active, inactive);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
log.warn("Failed to update inactivity states", t);
|
||||
log.warn("Failed to check devices states", t);
|
||||
}
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired(long ts, DeviceId deviceId) {
|
||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||
updateInactivityStateIfExpired(ts, deviceId, stateData);
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired(long ts, DeviceId deviceId, DeviceStateData stateData) {
|
||||
log.trace("Processing state {} for device {}", stateData, deviceId);
|
||||
if (stateData != null) {
|
||||
|
||||
@ -416,6 +416,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
}
|
||||
|
||||
private <S> void addVoidCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
|
||||
if (callback == null) return;
|
||||
Futures.addCallback(saveFuture, new FutureCallback<S>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable S result) {
|
||||
|
||||
@ -140,6 +140,7 @@ usage:
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
check:
|
||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||
gauge_report_interval: "${USAGE_STATS_GAUGE_REPORT_INTERVAL:180000}"
|
||||
|
||||
# UI parameters
|
||||
ui:
|
||||
|
||||
@ -30,10 +30,8 @@ import org.awaitility.Awaitility;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hibernate.exception.ConstraintViolationException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.junit.rules.TestWatcher;
|
||||
@ -78,6 +76,9 @@ import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.asset.AssetProfile;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DeviceData;
|
||||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
|
||||
@ -533,6 +534,17 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
return assetProfile;
|
||||
}
|
||||
|
||||
protected Device createDevice(String name, String accessToken) throws Exception {
|
||||
Device device = new Device();
|
||||
device.setName(name);
|
||||
device.setType("default");
|
||||
DeviceData deviceData = new DeviceData();
|
||||
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
|
||||
deviceData.setConfiguration(new DefaultDeviceConfiguration());
|
||||
device.setDeviceData(deviceData);
|
||||
return doPost("/api/device?accessToken=" + accessToken, device, Device.class);
|
||||
}
|
||||
|
||||
protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) {
|
||||
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
|
||||
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC);
|
||||
@ -897,6 +909,12 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
return (T) field.get(target);
|
||||
}
|
||||
|
||||
protected void setStaticFieldValue(Class<?> targetCls, String fieldName, Object value) throws Exception {
|
||||
Field field = targetCls.getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
field.set(null, value);
|
||||
}
|
||||
|
||||
protected int getDeviceActorSubscriptionCount(DeviceId deviceId, FeatureType featureType) {
|
||||
DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId);
|
||||
Map<UUID, SessionInfo> subscriptions = (Map<UUID, SessionInfo>) ReflectionTestUtils.getField(processor, getMapName(featureType));
|
||||
|
||||
@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.notification;
|
||||
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.dao.notification.DefaultNotificationSettingsService;
|
||||
import org.thingsboard.server.dao.notification.NotificationRuleService;
|
||||
import org.thingsboard.server.dao.notification.NotificationTargetService;
|
||||
import org.thingsboard.server.dao.notification.NotificationTemplateService;
|
||||
import org.thingsboard.server.dao.settings.AdminSettingsService;
|
||||
|
||||
@Service
|
||||
@Primary
|
||||
public class MockNotificationSettingsService extends DefaultNotificationSettingsService {
|
||||
|
||||
public MockNotificationSettingsService(AdminSettingsService adminSettingsService, NotificationTargetService notificationTargetService, NotificationTemplateService notificationTemplateService, NotificationRuleService notificationRuleService) {
|
||||
super(adminSettingsService, notificationTargetService, notificationTemplateService, notificationRuleService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDefaultNotificationConfigs(TenantId tenantId) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
}
|
||||
@ -73,7 +73,7 @@ public class DefaultDeviceStateServiceTest {
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null, null));
|
||||
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.stats;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.ApiUsageStateId;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
|
||||
@DaoSqlTest
|
||||
@TestPropertySource(properties = {
|
||||
"usage.stats.report.enabled=true",
|
||||
"transport.http.enabled=true",
|
||||
"usage.stats.report.interval=2",
|
||||
"usage.stats.gauge_report_interval=1",
|
||||
"state.defaultStateCheckIntervalInSec=3",
|
||||
"state.defaultInactivityTimeoutInSec=10"
|
||||
|
||||
})
|
||||
public class DevicesStatisticsTest extends AbstractControllerTest {
|
||||
|
||||
@Autowired
|
||||
private TbApiUsageStateService apiUsageStateService;
|
||||
@Autowired
|
||||
private TimeseriesService timeseriesService;
|
||||
|
||||
private ApiUsageStateId apiUsageStateId;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
loginTenantAdmin();
|
||||
apiUsageStateId = apiUsageStateService.getApiUsageState(tenantId).getId();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDevicesActivityStats() throws Exception {
|
||||
int activeDevicesCount = 5;
|
||||
List<Device> activeDevices = new ArrayList<>();
|
||||
for (int i = 1; i <= activeDevicesCount; i++) {
|
||||
String name = "active_device_" + i;
|
||||
Device device = createDevice(name, name);
|
||||
activeDevices.add(device);
|
||||
}
|
||||
int inactiveDevicesCount = 10;
|
||||
List<Device> inactiveDevices = new ArrayList<>();
|
||||
for (int i = 1; i <= inactiveDevicesCount; i++) {
|
||||
String name = "inactive_device_" + i;
|
||||
Device device = createDevice(name, name);
|
||||
inactiveDevices.add(device);
|
||||
}
|
||||
|
||||
await().atMost(15, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isZero();
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount + inactiveDevicesCount);
|
||||
});
|
||||
|
||||
for (Device device : activeDevices) {
|
||||
postTelemetry(device.getName(), "{\"dp\":1}");
|
||||
}
|
||||
|
||||
await().atMost(40, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.ACTIVE_DEVICES, false)).isEqualTo(activeDevicesCount);
|
||||
assertThat(getLatestStats(ApiUsageRecordKey.INACTIVE_DEVICES, false)).isEqualTo(inactiveDevicesCount);
|
||||
});
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private Long getLatestStats(ApiUsageRecordKey key, boolean hourly) {
|
||||
return timeseriesService.findLatest(tenantId, apiUsageStateId, List.of(key.getApiCountKey() + (hourly ? "Hourly" : "")))
|
||||
.get().stream().findFirst().flatMap(KvEntry::getLongValue).orElse(null);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private void postTelemetry(String accessToken, String json) {
|
||||
doPost("/api/v1/" + accessToken + "/telemetry", json, new String[0]).andExpect(status().isOk());
|
||||
}
|
||||
|
||||
}
|
||||
@ -190,7 +190,7 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
|
||||
return device;
|
||||
}
|
||||
|
||||
private Device createDevice(String deviceName, String accessToken) throws Exception {
|
||||
protected Device createDevice(String deviceName, String accessToken) throws Exception {
|
||||
Device device = new Device();
|
||||
device.setName(deviceName);
|
||||
device.setType("default");
|
||||
|
||||
@ -1034,6 +1034,7 @@ message ToUsageStatsServiceMsg {
|
||||
repeated UsageStatsKVProto values = 5;
|
||||
int64 customerIdMSB = 6;
|
||||
int64 customerIdLSB = 7;
|
||||
string serviceId = 8;
|
||||
}
|
||||
|
||||
message ToOtaPackageStateServiceMsg {
|
||||
|
||||
@ -26,7 +26,9 @@ public enum ApiUsageRecordKey {
|
||||
JS_EXEC_COUNT(ApiFeature.JS, "jsExecutionCount", "jsExecutionLimit", "JavaScript execution"),
|
||||
EMAIL_EXEC_COUNT(ApiFeature.EMAIL, "emailCount", "emailLimit", "email message"),
|
||||
SMS_EXEC_COUNT(ApiFeature.SMS, "smsCount", "smsLimit", "SMS message"),
|
||||
CREATED_ALARMS_COUNT(ApiFeature.ALARM, "createdAlarmsCount", "createdAlarmsLimit", "alarm");
|
||||
CREATED_ALARMS_COUNT(ApiFeature.ALARM, "createdAlarmsCount", "createdAlarmsLimit", "alarm"),
|
||||
ACTIVE_DEVICES("activeDevicesCount"),
|
||||
INACTIVE_DEVICES("inactiveDevicesCount");
|
||||
|
||||
private static final ApiUsageRecordKey[] JS_RECORD_KEYS = {JS_EXEC_COUNT};
|
||||
private static final ApiUsageRecordKey[] RE_RECORD_KEYS = {RE_EXEC_COUNT};
|
||||
@ -44,12 +46,23 @@ public enum ApiUsageRecordKey {
|
||||
private final String apiLimitKey;
|
||||
@Getter
|
||||
private final String unitLabel;
|
||||
@Getter
|
||||
private final boolean counter;
|
||||
|
||||
ApiUsageRecordKey(ApiFeature apiFeature, String apiCountKey, String apiLimitKey, String unitLabel) {
|
||||
this(apiFeature, apiCountKey, apiLimitKey, unitLabel, true);
|
||||
}
|
||||
|
||||
ApiUsageRecordKey(String apiCountKey) {
|
||||
this(null, apiCountKey, null, null, false);
|
||||
}
|
||||
|
||||
ApiUsageRecordKey(ApiFeature apiFeature, String apiCountKey, String apiLimitKey, String unitLabel, boolean counter) {
|
||||
this.apiFeature = apiFeature;
|
||||
this.apiCountKey = apiCountKey;
|
||||
this.apiLimitKey = apiLimitKey;
|
||||
this.unitLabel = unitLabel;
|
||||
this.counter = counter;
|
||||
}
|
||||
|
||||
public static ApiUsageRecordKey[] getKeys(ApiFeature feature) {
|
||||
|
||||
@ -16,11 +16,11 @@
|
||||
package org.thingsboard.server.queue.usagestats;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -32,12 +32,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
||||
|
||||
@Value("${usage.stats.report.enabled:true}")
|
||||
@ -56,19 +57,14 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
||||
@Value("${usage.stats.report.interval:10}")
|
||||
private int interval;
|
||||
|
||||
private final EnumMap<ApiUsageRecordKey, ConcurrentMap<OwnerId, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
|
||||
private final EnumMap<ApiUsageRecordKey, ConcurrentMap<ReportLevel, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
|
||||
|
||||
private final PartitionService partitionService;
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
private final SchedulerComponent scheduler;
|
||||
private final TbQueueProducerProvider producerProvider;
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgProducer;
|
||||
|
||||
public DefaultTbApiUsageReportClient(PartitionService partitionService, SchedulerComponent scheduler, TbQueueProducerProvider producerProvider) {
|
||||
this.partitionService = partitionService;
|
||||
this.scheduler = scheduler;
|
||||
this.producerProvider = producerProvider;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
if (enabled) {
|
||||
@ -87,42 +83,48 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
||||
}
|
||||
|
||||
private void reportStats() {
|
||||
ConcurrentMap<OwnerId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
|
||||
ConcurrentMap<ParentEntity, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
|
||||
|
||||
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
||||
ConcurrentMap<OwnerId, AtomicLong> statsForKey = stats.get(key);
|
||||
statsForKey.forEach((ownerId, statsValue) -> {
|
||||
ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key);
|
||||
statsForKey.forEach((reportLevel, statsValue) -> {
|
||||
long value = statsValue.get();
|
||||
if (value == 0) return;
|
||||
if (value == 0 && key.isCounter()) return;
|
||||
|
||||
ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(ownerId, id -> {
|
||||
ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder();
|
||||
ToUsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> {
|
||||
ToUsageStatsServiceMsg.Builder newStatsMsg = ToUsageStatsServiceMsg.newBuilder();
|
||||
|
||||
TenantId tenantId = ownerId.getTenantId();
|
||||
newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
TenantId tenantId = parent.getTenantId();
|
||||
newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
newStatsMsg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
|
||||
EntityId entityId = ownerId.getEntityId();
|
||||
if (entityId != null && entityId.getEntityType() == EntityType.CUSTOMER) {
|
||||
newStatsMsgBuilder.setCustomerIdMSB(entityId.getId().getMostSignificantBits());
|
||||
newStatsMsgBuilder.setCustomerIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
CustomerId customerId = parent.getCustomerId();
|
||||
if (customerId != null) {
|
||||
newStatsMsg.setCustomerIdMSB(customerId.getId().getMostSignificantBits());
|
||||
newStatsMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
|
||||
}
|
||||
|
||||
return newStatsMsgBuilder;
|
||||
newStatsMsg.setServiceId(serviceInfoProvider.getServiceId());
|
||||
return newStatsMsg;
|
||||
});
|
||||
|
||||
statsMsgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build());
|
||||
UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder()
|
||||
.setKey(key.name())
|
||||
.setValue(value);
|
||||
statsMsg.addValues(statsItem.build());
|
||||
});
|
||||
statsForKey.clear();
|
||||
}
|
||||
|
||||
report.forEach(((ownerId, statsMsg) -> {
|
||||
report.forEach(((parent, statsMsg) -> {
|
||||
//TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
|
||||
|
||||
TenantId tenantId = ownerId.getTenantId();
|
||||
EntityId entityId = Optional.ofNullable(ownerId.getEntityId()).orElse(tenantId);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId).newByTopic(msgProducer.getDefaultTopic());
|
||||
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
|
||||
try {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
|
||||
.newByTopic(msgProducer.getDefaultTopic());
|
||||
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to report usage stats for tenant {}", parent.getTenantId(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
if (!report.isEmpty()) {
|
||||
@ -132,16 +134,17 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
||||
|
||||
@Override
|
||||
public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
|
||||
if (enabled) {
|
||||
ConcurrentMap<OwnerId, AtomicLong> statsForKey = stats.get(key);
|
||||
if (!enabled) return;
|
||||
|
||||
statsForKey.computeIfAbsent(new OwnerId(tenantId), id -> new AtomicLong()).addAndGet(value);
|
||||
statsForKey.computeIfAbsent(new OwnerId(TenantId.SYS_TENANT_ID), id -> new AtomicLong()).addAndGet(value);
|
||||
|
||||
if (enabledPerCustomer && customerId != null && !customerId.isNullUid()) {
|
||||
statsForKey.computeIfAbsent(new OwnerId(tenantId, customerId), id -> new AtomicLong()).addAndGet(value);
|
||||
}
|
||||
ReportLevel[] reportLevels = new ReportLevel[3];
|
||||
reportLevels[0] = ReportLevel.of(tenantId);
|
||||
if (key.isCounter()) {
|
||||
reportLevels[1] = ReportLevel.of(TenantId.SYS_TENANT_ID);
|
||||
}
|
||||
if (enabledPerCustomer && customerId != null && !customerId.isNullUid()) {
|
||||
reportLevels[2] = ReportLevel.of(tenantId, customerId);
|
||||
}
|
||||
report(key, value, reportLevels);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -149,18 +152,47 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
||||
report(tenantId, customerId, key, 1);
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class OwnerId {
|
||||
private TenantId tenantId;
|
||||
private EntityId entityId;
|
||||
private void report(ApiUsageRecordKey key, long value, ReportLevel... levels) {
|
||||
ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key);
|
||||
for (ReportLevel level : levels) {
|
||||
if (level == null) continue;
|
||||
|
||||
public OwnerId(TenantId tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
|
||||
public OwnerId(TenantId tenantId, EntityId entityId) {
|
||||
this.tenantId = tenantId;
|
||||
this.entityId = entityId;
|
||||
AtomicLong n = statsForKey.computeIfAbsent(level, k -> new AtomicLong());
|
||||
if (key.isCounter()) {
|
||||
n.addAndGet(value);
|
||||
} else {
|
||||
n.set(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ReportLevel {
|
||||
private final TenantId tenantId;
|
||||
private final CustomerId customerId;
|
||||
|
||||
public static ReportLevel of(TenantId tenantId) {
|
||||
return new ReportLevel(tenantId, null);
|
||||
}
|
||||
|
||||
public static ReportLevel of(TenantId tenantId, CustomerId customerId) {
|
||||
return new ReportLevel(tenantId, customerId);
|
||||
}
|
||||
|
||||
public ParentEntity getParentEntity() {
|
||||
return new ParentEntity(tenantId, customerId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ParentEntity {
|
||||
private final TenantId tenantId;
|
||||
private final CustomerId customerId;
|
||||
|
||||
public EntityId getId() {
|
||||
return customerId != null ? customerId : tenantId;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -126,6 +126,7 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
|
||||
|
||||
List<TsKvEntry> profileThresholds = new ArrayList<>();
|
||||
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
||||
if (key.getApiLimitKey() == null) continue;
|
||||
profileThresholds.add(new BasicTsKvEntry(saved.getCreatedTime(), new LongDataEntry(key.getApiLimitKey(), configuration.getProfileThreshold(key))));
|
||||
}
|
||||
tsService.save(tenantId, saved.getId(), profileThresholds, 0L);
|
||||
|
||||
@ -167,6 +167,14 @@ vc:
|
||||
io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"
|
||||
repositories-folder: "${TB_VC_GIT_REPOSITORIES_FOLDER:${java.io.tmpdir}/repositories}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
@ -270,6 +270,14 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
@ -255,6 +255,14 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
@ -337,6 +337,14 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
@ -285,6 +285,14 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
@ -235,6 +235,14 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
|
||||
# Usage statistics parameters
|
||||
usage:
|
||||
stats:
|
||||
report:
|
||||
enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
|
||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
enabled: "${METRICS_ENABLED:false}"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user