diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index b0ab4edc27..3dedea999c 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -54,10 +54,12 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.tools.SchedulerUtils; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -144,18 +146,40 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } @Override - public void process(TbProtoQueueMsg msg, TbCallback callback) { - ToUsageStatsServiceMsg statsMsg = msg.getValue(); + public void process(TbProtoQueueMsg msgPack, TbCallback callback) { + ToUsageStatsServiceMsg serviceMsg = msgPack.getValue(); + String serviceId = serviceMsg.getServiceId(); - TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); - EntityId ownerId; - if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { - ownerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); + List msgs; + + //For backward compatibility, remove after release + if (serviceMsg.getMsgsList().isEmpty()) { + TransportProtos.UsageStatsServiceMsg oldMsg = TransportProtos.UsageStatsServiceMsg.newBuilder() + .setTenantIdMSB(serviceMsg.getTenantIdMSB()) + .setTenantIdLSB(serviceMsg.getTenantIdLSB()) + .setCustomerIdMSB(serviceMsg.getCustomerIdMSB()) + .setCustomerIdLSB(serviceMsg.getCustomerIdLSB()) + .setEntityIdMSB(serviceMsg.getEntityIdMSB()) + .setEntityIdLSB(serviceMsg.getEntityIdLSB()) + .addAllValues(serviceMsg.getValuesList()) + .build(); + + msgs = List.of(oldMsg); } else { - ownerId = tenantId; + msgs = serviceMsg.getMsgsList(); } - processEntityUsageStats(tenantId, ownerId, statsMsg.getValuesList(), statsMsg.getServiceId()); + msgs.forEach(msg -> { + TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); + EntityId ownerId; + if (msg.getCustomerIdMSB() != 0 && msg.getCustomerIdLSB() != 0) { + ownerId = new CustomerId(new UUID(msg.getCustomerIdMSB(), msg.getCustomerIdLSB())); + } else { + ownerId = tenantId; + } + + processEntityUsageStats(tenantId, ownerId, msg.getValuesList(), serviceId); + }); callback.onSuccess(); } @@ -181,7 +205,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); Set apiFeatures = new HashSet<>(); for (UsageStatsKVProto statsItem : values) { - ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey()); + ApiUsageRecordKey recordKey; + + //For backward compatibility, remove after release + if (StringUtils.isNotEmpty(statsItem.getKey())) { + recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey()); + } else { + recordKey = ProtoUtils.fromProto(statsItem.getRecordKey()); + } StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId); if (calculationResult.isValueChanged()) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b3022fb9d3..721d23b700 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -187,7 +187,9 @@ usage: # Enable/Disable the collection of API usage statistics on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Statistics reporting interval, set to send summarized data every 10 seconds by default - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" check: # Interval of checking the start of the next cycle and re-enabling the blocked tenants/customers cycle: "${USAGE_STATS_CHECK_CYCLE:60000}" diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index f1da00d943..1ebd753f3c 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.Device; @@ -93,6 +94,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto; import java.util.ArrayList; import java.util.Arrays; @@ -432,6 +434,39 @@ public class ProtoUtils { return builder.build(); } + public static ApiUsageRecordKeyProto toProto(ApiUsageRecordKey apiUsageRecordKey) { + return switch (apiUsageRecordKey) { + case TRANSPORT_MSG_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_MSG_COUNT; + case TRANSPORT_DP_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_DP_COUNT; + case STORAGE_DP_COUNT -> ApiUsageRecordKeyProto.STORAGE_DP_COUNT; + case RE_EXEC_COUNT -> ApiUsageRecordKeyProto.RE_EXEC_COUNT; + case JS_EXEC_COUNT -> ApiUsageRecordKeyProto.JS_EXEC_COUNT; + case TBEL_EXEC_COUNT -> ApiUsageRecordKeyProto.TBEL_EXEC_COUNT; + case EMAIL_EXEC_COUNT -> ApiUsageRecordKeyProto.EMAIL_EXEC_COUNT; + case SMS_EXEC_COUNT -> ApiUsageRecordKeyProto.SMS_EXEC_COUNT; + case CREATED_ALARMS_COUNT -> ApiUsageRecordKeyProto.CREATED_ALARMS_COUNT; + case ACTIVE_DEVICES -> ApiUsageRecordKeyProto.ACTIVE_DEVICES; + case INACTIVE_DEVICES -> ApiUsageRecordKeyProto.INACTIVE_DEVICES; + }; + } + + public static ApiUsageRecordKey fromProto(ApiUsageRecordKeyProto proto) { + return switch (proto) { + case UNRECOGNIZED -> null; + case TRANSPORT_MSG_COUNT -> ApiUsageRecordKey.TRANSPORT_MSG_COUNT; + case TRANSPORT_DP_COUNT -> ApiUsageRecordKey.TRANSPORT_DP_COUNT; + case STORAGE_DP_COUNT -> ApiUsageRecordKey.STORAGE_DP_COUNT; + case RE_EXEC_COUNT -> ApiUsageRecordKey.RE_EXEC_COUNT; + case JS_EXEC_COUNT -> ApiUsageRecordKey.JS_EXEC_COUNT; + case TBEL_EXEC_COUNT -> ApiUsageRecordKey.TBEL_EXEC_COUNT; + case EMAIL_EXEC_COUNT -> ApiUsageRecordKey.EMAIL_EXEC_COUNT; + case SMS_EXEC_COUNT -> ApiUsageRecordKey.SMS_EXEC_COUNT; + case CREATED_ALARMS_COUNT -> ApiUsageRecordKey.CREATED_ALARMS_COUNT; + case ACTIVE_DEVICES -> ApiUsageRecordKey.ACTIVE_DEVICES; + case INACTIVE_DEVICES -> ApiUsageRecordKey.INACTIVE_DEVICES; + }; + } + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { return new DeviceAttributesEventNotificationMsg( TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 6ecdf13981..f70e55dc84 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -63,6 +63,20 @@ enum EntityTypeProto { CALCULATED_FIELD_LINK = 40; } +enum ApiUsageRecordKeyProto { + TRANSPORT_MSG_COUNT = 0; + TRANSPORT_DP_COUNT = 1; + STORAGE_DP_COUNT = 2; + RE_EXEC_COUNT = 3; + JS_EXEC_COUNT = 4; + TBEL_EXEC_COUNT = 5; + EMAIL_EXEC_COUNT = 6; + SMS_EXEC_COUNT = 7; + CREATED_ALARMS_COUNT = 8; + ACTIVE_DEVICES = 9; + INACTIVE_DEVICES = 10; +} + /** * Service Discovery Data Structures; */ @@ -1720,12 +1734,25 @@ message ToTransportMsg { repeated QueueDeleteMsg queueDeleteMsgs = 16; } -message UsageStatsKVProto{ - string key = 1; +message UsageStatsKVProto { + string key = 1 [deprecated=true]; int64 value = 2; + ApiUsageRecordKeyProto recordKey = 3; } message ToUsageStatsServiceMsg { + int64 tenantIdMSB = 1 [deprecated=true]; + int64 tenantIdLSB = 2 [deprecated=true]; + int64 entityIdMSB = 3 [deprecated=true]; + int64 entityIdLSB = 4 [deprecated=true]; + repeated UsageStatsKVProto values = 5 [deprecated=true]; + int64 customerIdMSB = 6 [deprecated=true]; + int64 customerIdLSB = 7 [deprecated=true]; + string serviceId = 8; + repeated UsageStatsServiceMsg msgs = 9; +} + +message UsageStatsServiceMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; int64 entityIdMSB = 3; @@ -1733,7 +1760,6 @@ message ToUsageStatsServiceMsg { repeated UsageStatsKVProto values = 5; int64 customerIdMSB = 6; int64 customerIdLSB = 7; - string serviceId = 8; } message ToOtaPackageStateServiceMsg { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java index bc90a8c392..3cea112327 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.usagestats; +import com.google.common.collect.Lists; import jakarta.annotation.PostConstruct; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -29,6 +30,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.TbApiUsageReportClient; +import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.TbQueueProducer; @@ -38,7 +42,11 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; +import java.util.ArrayList; import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -57,6 +65,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { private boolean enabledPerCustomer; @Value("${usage.stats.report.interval:10}") private int interval; + @Value("${usage.stats.report.pack_size:1024}") + private int packSize; private final EnumMap> stats = new EnumMap<>(ApiUsageRecordKey.class); @@ -64,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { private final TbServiceInfoProvider serviceInfoProvider; private final SchedulerComponent scheduler; private final TbQueueProducerProvider producerProvider; - private TbQueueProducer> msgProducer; + private TbQueueProducer> msgProducer; @PostConstruct private void init() { @@ -84,7 +94,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { } private void reportStats() { - ConcurrentMap report = new ConcurrentHashMap<>(); + ConcurrentMap report = new ConcurrentHashMap<>(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { ConcurrentMap statsForKey = stats.get(key); @@ -92,8 +102,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { long value = statsValue.get(); if (value == 0 && key.isCounter()) return; - ToUsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> { - ToUsageStatsServiceMsg.Builder newStatsMsg = ToUsageStatsServiceMsg.newBuilder(); + UsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> { + UsageStatsServiceMsg.Builder newStatsMsg = UsageStatsServiceMsg.newBuilder(); TenantId tenantId = parent.getTenantId(); newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); @@ -105,36 +115,54 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { newStatsMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits()); } - newStatsMsg.setServiceId(serviceInfoProvider.getServiceId()); return newStatsMsg; }); UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder() - .setKey(key.name()) + .setRecordKey(ProtoUtils.toProto(key)) .setValue(value); statsMsg.addValues(statsItem.build()); }); - statsForKey.clear(); } - report.forEach(((parent, statsMsg) -> { - //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? + Map> reportStatsPerTpi = new HashMap<>(); + + report.forEach((parent, statsMsg) -> { try { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId()) .newByTopic(msgProducer.getDefaultTopic()); - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null); + reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build()); } catch (TenantNotFoundException e) { log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId()); - } catch (Exception e) { - log.warn("Failed to report usage stats for tenant {}", parent.getTenantId(), e); } - })); + }); + + reportStatsPerTpi.forEach((tpi, statsList) -> { + toMsgPack(statsList).forEach(pack -> { + try { + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), pack), null); + } catch (Exception e) { + log.warn("Failed to report usage stats pack to TPI {}", tpi, e); + } + }); + }); if (!report.isEmpty()) { log.debug("Reporting API usage statistics for {} tenants and customers", report.size()); } } + private List toMsgPack(List list) { + return Lists.partition(list, packSize) + .stream() + .map(partition -> + ToUsageStatsServiceMsg.newBuilder() + .addAllMsgs(partition) + .setServiceId(serviceInfoProvider.getServiceId()) + .build()) + .toList(); + } + @Override public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) { if (!enabled) return; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index f0b1426cc7..bb7f607ca0 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -207,7 +207,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f8df4bb55e..f60a6bd47e 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -411,7 +411,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index d282b50ff3..c921f9f9ae 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -360,7 +360,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index a198613e11..85e865e60e 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -461,7 +461,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index fb75203499..a6ca2f1a6e 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -394,7 +394,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 281e221674..6848e8af26 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -347,7 +347,9 @@ usage: # Enable/Disable collection of statistics about API usage on a customer level enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds - interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + interval: "${USAGE_STATS_REPORT_INTERVAL:60}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: