Merge pull request #12191 from YevhenBondarenko/feature/api-usage-improvements
Reduce API usage traffic
This commit is contained in:
commit
9c3858755c
@ -54,10 +54,12 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
|||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
|
||||||
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
import org.thingsboard.server.dao.tenant.TenantService;
|
import org.thingsboard.server.dao.tenant.TenantService;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
|
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
@ -144,18 +146,40 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
|
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msgPack, TbCallback callback) {
|
||||||
ToUsageStatsServiceMsg statsMsg = msg.getValue();
|
ToUsageStatsServiceMsg serviceMsg = msgPack.getValue();
|
||||||
|
String serviceId = serviceMsg.getServiceId();
|
||||||
|
|
||||||
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
|
List<TransportProtos.UsageStatsServiceMsg> msgs;
|
||||||
|
|
||||||
|
//For backward compatibility, remove after release
|
||||||
|
if (serviceMsg.getMsgsList().isEmpty()) {
|
||||||
|
TransportProtos.UsageStatsServiceMsg oldMsg = TransportProtos.UsageStatsServiceMsg.newBuilder()
|
||||||
|
.setTenantIdMSB(serviceMsg.getTenantIdMSB())
|
||||||
|
.setTenantIdLSB(serviceMsg.getTenantIdLSB())
|
||||||
|
.setCustomerIdMSB(serviceMsg.getCustomerIdMSB())
|
||||||
|
.setCustomerIdLSB(serviceMsg.getCustomerIdLSB())
|
||||||
|
.setEntityIdMSB(serviceMsg.getEntityIdMSB())
|
||||||
|
.setEntityIdLSB(serviceMsg.getEntityIdLSB())
|
||||||
|
.addAllValues(serviceMsg.getValuesList())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
msgs = List.of(oldMsg);
|
||||||
|
} else {
|
||||||
|
msgs = serviceMsg.getMsgsList();
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs.forEach(msg -> {
|
||||||
|
TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
|
||||||
EntityId ownerId;
|
EntityId ownerId;
|
||||||
if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) {
|
if (msg.getCustomerIdMSB() != 0 && msg.getCustomerIdLSB() != 0) {
|
||||||
ownerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB()));
|
ownerId = new CustomerId(new UUID(msg.getCustomerIdMSB(), msg.getCustomerIdLSB()));
|
||||||
} else {
|
} else {
|
||||||
ownerId = tenantId;
|
ownerId = tenantId;
|
||||||
}
|
}
|
||||||
|
|
||||||
processEntityUsageStats(tenantId, ownerId, statsMsg.getValuesList(), statsMsg.getServiceId());
|
processEntityUsageStats(tenantId, ownerId, msg.getValuesList(), serviceId);
|
||||||
|
});
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,7 +205,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
|||||||
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
|
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
|
||||||
Set<ApiFeature> apiFeatures = new HashSet<>();
|
Set<ApiFeature> apiFeatures = new HashSet<>();
|
||||||
for (UsageStatsKVProto statsItem : values) {
|
for (UsageStatsKVProto statsItem : values) {
|
||||||
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
|
ApiUsageRecordKey recordKey;
|
||||||
|
|
||||||
|
//For backward compatibility, remove after release
|
||||||
|
if (StringUtils.isNotEmpty(statsItem.getKey())) {
|
||||||
|
recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
|
||||||
|
} else {
|
||||||
|
recordKey = ProtoUtils.fromProto(statsItem.getRecordKey());
|
||||||
|
}
|
||||||
|
|
||||||
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
|
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
|
||||||
if (calculationResult.isValueChanged()) {
|
if (calculationResult.isValueChanged()) {
|
||||||
|
|||||||
@ -187,7 +187,9 @@ usage:
|
|||||||
# Enable/Disable the collection of API usage statistics on a customer level
|
# Enable/Disable the collection of API usage statistics on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Statistics reporting interval, set to send summarized data every 10 seconds by default
|
# Statistics reporting interval, set to send summarized data every 10 seconds by default
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
check:
|
check:
|
||||||
# Interval of checking the start of the next cycle and re-enabling the blocked tenants/customers
|
# Interval of checking the start of the next cycle and re-enabling the blocked tenants/customers
|
||||||
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
import org.thingsboard.server.common.data.ApiUsageState;
|
import org.thingsboard.server.common.data.ApiUsageState;
|
||||||
import org.thingsboard.server.common.data.ApiUsageStateValue;
|
import org.thingsboard.server.common.data.ApiUsageStateValue;
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
@ -93,6 +94,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
|
|||||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -432,6 +434,39 @@ public class ProtoUtils {
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ApiUsageRecordKeyProto toProto(ApiUsageRecordKey apiUsageRecordKey) {
|
||||||
|
return switch (apiUsageRecordKey) {
|
||||||
|
case TRANSPORT_MSG_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_MSG_COUNT;
|
||||||
|
case TRANSPORT_DP_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_DP_COUNT;
|
||||||
|
case STORAGE_DP_COUNT -> ApiUsageRecordKeyProto.STORAGE_DP_COUNT;
|
||||||
|
case RE_EXEC_COUNT -> ApiUsageRecordKeyProto.RE_EXEC_COUNT;
|
||||||
|
case JS_EXEC_COUNT -> ApiUsageRecordKeyProto.JS_EXEC_COUNT;
|
||||||
|
case TBEL_EXEC_COUNT -> ApiUsageRecordKeyProto.TBEL_EXEC_COUNT;
|
||||||
|
case EMAIL_EXEC_COUNT -> ApiUsageRecordKeyProto.EMAIL_EXEC_COUNT;
|
||||||
|
case SMS_EXEC_COUNT -> ApiUsageRecordKeyProto.SMS_EXEC_COUNT;
|
||||||
|
case CREATED_ALARMS_COUNT -> ApiUsageRecordKeyProto.CREATED_ALARMS_COUNT;
|
||||||
|
case ACTIVE_DEVICES -> ApiUsageRecordKeyProto.ACTIVE_DEVICES;
|
||||||
|
case INACTIVE_DEVICES -> ApiUsageRecordKeyProto.INACTIVE_DEVICES;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApiUsageRecordKey fromProto(ApiUsageRecordKeyProto proto) {
|
||||||
|
return switch (proto) {
|
||||||
|
case UNRECOGNIZED -> null;
|
||||||
|
case TRANSPORT_MSG_COUNT -> ApiUsageRecordKey.TRANSPORT_MSG_COUNT;
|
||||||
|
case TRANSPORT_DP_COUNT -> ApiUsageRecordKey.TRANSPORT_DP_COUNT;
|
||||||
|
case STORAGE_DP_COUNT -> ApiUsageRecordKey.STORAGE_DP_COUNT;
|
||||||
|
case RE_EXEC_COUNT -> ApiUsageRecordKey.RE_EXEC_COUNT;
|
||||||
|
case JS_EXEC_COUNT -> ApiUsageRecordKey.JS_EXEC_COUNT;
|
||||||
|
case TBEL_EXEC_COUNT -> ApiUsageRecordKey.TBEL_EXEC_COUNT;
|
||||||
|
case EMAIL_EXEC_COUNT -> ApiUsageRecordKey.EMAIL_EXEC_COUNT;
|
||||||
|
case SMS_EXEC_COUNT -> ApiUsageRecordKey.SMS_EXEC_COUNT;
|
||||||
|
case CREATED_ALARMS_COUNT -> ApiUsageRecordKey.CREATED_ALARMS_COUNT;
|
||||||
|
case ACTIVE_DEVICES -> ApiUsageRecordKey.ACTIVE_DEVICES;
|
||||||
|
case INACTIVE_DEVICES -> ApiUsageRecordKey.INACTIVE_DEVICES;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) {
|
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) {
|
||||||
return new DeviceAttributesEventNotificationMsg(
|
return new DeviceAttributesEventNotificationMsg(
|
||||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||||
|
|||||||
@ -63,6 +63,20 @@ enum EntityTypeProto {
|
|||||||
CALCULATED_FIELD_LINK = 40;
|
CALCULATED_FIELD_LINK = 40;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum ApiUsageRecordKeyProto {
|
||||||
|
TRANSPORT_MSG_COUNT = 0;
|
||||||
|
TRANSPORT_DP_COUNT = 1;
|
||||||
|
STORAGE_DP_COUNT = 2;
|
||||||
|
RE_EXEC_COUNT = 3;
|
||||||
|
JS_EXEC_COUNT = 4;
|
||||||
|
TBEL_EXEC_COUNT = 5;
|
||||||
|
EMAIL_EXEC_COUNT = 6;
|
||||||
|
SMS_EXEC_COUNT = 7;
|
||||||
|
CREATED_ALARMS_COUNT = 8;
|
||||||
|
ACTIVE_DEVICES = 9;
|
||||||
|
INACTIVE_DEVICES = 10;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Service Discovery Data Structures;
|
* Service Discovery Data Structures;
|
||||||
*/
|
*/
|
||||||
@ -1720,12 +1734,25 @@ message ToTransportMsg {
|
|||||||
repeated QueueDeleteMsg queueDeleteMsgs = 16;
|
repeated QueueDeleteMsg queueDeleteMsgs = 16;
|
||||||
}
|
}
|
||||||
|
|
||||||
message UsageStatsKVProto{
|
message UsageStatsKVProto {
|
||||||
string key = 1;
|
string key = 1 [deprecated=true];
|
||||||
int64 value = 2;
|
int64 value = 2;
|
||||||
|
ApiUsageRecordKeyProto recordKey = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ToUsageStatsServiceMsg {
|
message ToUsageStatsServiceMsg {
|
||||||
|
int64 tenantIdMSB = 1 [deprecated=true];
|
||||||
|
int64 tenantIdLSB = 2 [deprecated=true];
|
||||||
|
int64 entityIdMSB = 3 [deprecated=true];
|
||||||
|
int64 entityIdLSB = 4 [deprecated=true];
|
||||||
|
repeated UsageStatsKVProto values = 5 [deprecated=true];
|
||||||
|
int64 customerIdMSB = 6 [deprecated=true];
|
||||||
|
int64 customerIdLSB = 7 [deprecated=true];
|
||||||
|
string serviceId = 8;
|
||||||
|
repeated UsageStatsServiceMsg msgs = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UsageStatsServiceMsg {
|
||||||
int64 tenantIdMSB = 1;
|
int64 tenantIdMSB = 1;
|
||||||
int64 tenantIdLSB = 2;
|
int64 tenantIdLSB = 2;
|
||||||
int64 entityIdMSB = 3;
|
int64 entityIdMSB = 3;
|
||||||
@ -1733,7 +1760,6 @@ message ToUsageStatsServiceMsg {
|
|||||||
repeated UsageStatsKVProto values = 5;
|
repeated UsageStatsKVProto values = 5;
|
||||||
int64 customerIdMSB = 6;
|
int64 customerIdMSB = 6;
|
||||||
int64 customerIdLSB = 7;
|
int64 customerIdLSB = 7;
|
||||||
string serviceId = 8;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ToOtaPackageStateServiceMsg {
|
message ToOtaPackageStateServiceMsg {
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.queue.usagestats;
|
package org.thingsboard.server.queue.usagestats;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
@ -29,6 +30,9 @@ import org.thingsboard.server.common.data.id.TenantId;
|
|||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||||
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsServiceMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
|
||||||
import org.thingsboard.server.queue.TbQueueProducer;
|
import org.thingsboard.server.queue.TbQueueProducer;
|
||||||
@ -38,7 +42,11 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
|||||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@ -57,6 +65,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
private boolean enabledPerCustomer;
|
private boolean enabledPerCustomer;
|
||||||
@Value("${usage.stats.report.interval:10}")
|
@Value("${usage.stats.report.interval:10}")
|
||||||
private int interval;
|
private int interval;
|
||||||
|
@Value("${usage.stats.report.pack_size:1024}")
|
||||||
|
private int packSize;
|
||||||
|
|
||||||
private final EnumMap<ApiUsageRecordKey, ConcurrentMap<ReportLevel, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
|
private final EnumMap<ApiUsageRecordKey, ConcurrentMap<ReportLevel, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
|
||||||
|
|
||||||
@ -64,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
private final TbServiceInfoProvider serviceInfoProvider;
|
private final TbServiceInfoProvider serviceInfoProvider;
|
||||||
private final SchedulerComponent scheduler;
|
private final SchedulerComponent scheduler;
|
||||||
private final TbQueueProducerProvider producerProvider;
|
private final TbQueueProducerProvider producerProvider;
|
||||||
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgProducer;
|
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> msgProducer;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
@ -84,7 +94,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void reportStats() {
|
private void reportStats() {
|
||||||
ConcurrentMap<ParentEntity, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
|
ConcurrentMap<ParentEntity, UsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
|
||||||
ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key);
|
ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key);
|
||||||
@ -92,8 +102,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
long value = statsValue.get();
|
long value = statsValue.get();
|
||||||
if (value == 0 && key.isCounter()) return;
|
if (value == 0 && key.isCounter()) return;
|
||||||
|
|
||||||
ToUsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> {
|
UsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> {
|
||||||
ToUsageStatsServiceMsg.Builder newStatsMsg = ToUsageStatsServiceMsg.newBuilder();
|
UsageStatsServiceMsg.Builder newStatsMsg = UsageStatsServiceMsg.newBuilder();
|
||||||
|
|
||||||
TenantId tenantId = parent.getTenantId();
|
TenantId tenantId = parent.getTenantId();
|
||||||
newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||||
@ -105,36 +115,54 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
newStatsMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
|
newStatsMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
|
||||||
}
|
}
|
||||||
|
|
||||||
newStatsMsg.setServiceId(serviceInfoProvider.getServiceId());
|
|
||||||
return newStatsMsg;
|
return newStatsMsg;
|
||||||
});
|
});
|
||||||
|
|
||||||
UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder()
|
UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder()
|
||||||
.setKey(key.name())
|
.setRecordKey(ProtoUtils.toProto(key))
|
||||||
.setValue(value);
|
.setValue(value);
|
||||||
statsMsg.addValues(statsItem.build());
|
statsMsg.addValues(statsItem.build());
|
||||||
});
|
});
|
||||||
statsForKey.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
report.forEach(((parent, statsMsg) -> {
|
Map<TopicPartitionInfo, List<UsageStatsServiceMsg>> reportStatsPerTpi = new HashMap<>();
|
||||||
//TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
|
|
||||||
|
report.forEach((parent, statsMsg) -> {
|
||||||
try {
|
try {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
|
||||||
.newByTopic(msgProducer.getDefaultTopic());
|
.newByTopic(msgProducer.getDefaultTopic());
|
||||||
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
|
reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build());
|
||||||
} catch (TenantNotFoundException e) {
|
} catch (TenantNotFoundException e) {
|
||||||
log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId());
|
log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId());
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Failed to report usage stats for tenant {}", parent.getTenantId(), e);
|
|
||||||
}
|
}
|
||||||
}));
|
});
|
||||||
|
|
||||||
|
reportStatsPerTpi.forEach((tpi, statsList) -> {
|
||||||
|
toMsgPack(statsList).forEach(pack -> {
|
||||||
|
try {
|
||||||
|
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), pack), null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to report usage stats pack to TPI {}", tpi, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
if (!report.isEmpty()) {
|
if (!report.isEmpty()) {
|
||||||
log.debug("Reporting API usage statistics for {} tenants and customers", report.size());
|
log.debug("Reporting API usage statistics for {} tenants and customers", report.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ToUsageStatsServiceMsg> toMsgPack(List<UsageStatsServiceMsg> list) {
|
||||||
|
return Lists.partition(list, packSize)
|
||||||
|
.stream()
|
||||||
|
.map(partition ->
|
||||||
|
ToUsageStatsServiceMsg.newBuilder()
|
||||||
|
.addAllMsgs(partition)
|
||||||
|
.setServiceId(serviceInfoProvider.getServiceId())
|
||||||
|
.build())
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
|
public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
|
||||||
if (!enabled) return;
|
if (!enabled) return;
|
||||||
|
|||||||
@ -207,7 +207,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
@ -411,7 +411,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
@ -360,7 +360,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
@ -461,7 +461,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
@ -394,7 +394,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
@ -347,7 +347,9 @@ usage:
|
|||||||
# Enable/Disable collection of statistics about API usage on a customer level
|
# Enable/Disable collection of statistics about API usage on a customer level
|
||||||
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
|
||||||
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
|
||||||
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
|
interval: "${USAGE_STATS_REPORT_INTERVAL:60}"
|
||||||
|
# Amount of statistic messages in pack
|
||||||
|
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
|
||||||
|
|
||||||
# Metrics parameters
|
# Metrics parameters
|
||||||
metrics:
|
metrics:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user