implemented sending api usages in pack

This commit is contained in:
YevhenBondarenko 2024-12-04 17:05:37 +01:00
parent ab2e788057
commit eaafdf1f87
47 changed files with 216 additions and 116 deletions

View File

@ -55,11 +55,13 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -153,9 +155,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
}
@Override
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
ToUsageStatsServiceMsg statsMsg = msg.getValue();
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msgPack, TbCallback callback) {
msgPack.getValue().getMsgsList().forEach(this::process);
callback.onSuccess();
}
private void process(ToUsageStatsServiceMsg statsMsg) {
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
EntityId ownerId;
if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) {
@ -165,7 +170,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
}
processEntityUsageStats(tenantId, ownerId, statsMsg.getValuesList(), statsMsg.getServiceId());
callback.onSuccess();
}
private void processEntityUsageStats(TenantId tenantId, EntityId ownerId, List<UsageStatsKVProto> values, String serviceId) {
@ -190,7 +194,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
Set<ApiFeature> apiFeatures = new HashSet<>();
for (UsageStatsKVProto statsItem : values) {
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
ApiUsageRecordKey recordKey = ProtoUtils.fromProto(statsItem.getKey());
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
if (calculationResult.isValueChanged()) {

View File

@ -22,13 +22,13 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngineApiUsageStateService, ApplicationListener<PartitionChangeEvent> {
void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback);
void process(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msg, TbCallback callback);
void onTenantProfileUpdate(TenantProfileId tenantProfileId);

View File

@ -74,7 +74,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -150,7 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbCoreConsumerStats stats;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> usageStatsConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
private volatile ListeningExecutorService deviceActivityEventsExecutor;
@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder()
this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>>builder()
.name("TB Usage Stats")
.msgPackProcessor(this::processUsageStatsMsg)
.pollInterval(pollInterval)
@ -402,11 +402,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void processUsageStatsMsg(List<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumer) throws Exception {
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
private void processUsageStatsMsg(List<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumer) throws Exception {
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> pendingMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsg>> ctx = new TbPackProcessingContext<>(
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
pendingMap.forEach((id, msg) -> {
log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue());
@ -453,7 +453,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
consumer.commit();
}
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msg, TbCallback callback) {
statsService.process(msg, callback);
}

View File

@ -188,6 +188,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Statistics reporting interval, set to send summarized data every 10 seconds by default
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
check:
# Interval of checking the start of the next cycle and re-enabling the blocked tenants/customers
cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.Device;
@ -88,6 +89,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto;
import java.util.ArrayList;
import java.util.Arrays;
@ -399,6 +401,39 @@ public class ProtoUtils {
return builder.build();
}
public static ApiUsageRecordKeyProto toProto(ApiUsageRecordKey apiUsageRecordKey) {
return switch (apiUsageRecordKey) {
case TRANSPORT_MSG_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_MSG_COUNT;
case TRANSPORT_DP_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_DP_COUNT;
case STORAGE_DP_COUNT -> ApiUsageRecordKeyProto.STORAGE_DP_COUNT;
case RE_EXEC_COUNT -> ApiUsageRecordKeyProto.RE_EXEC_COUNT;
case JS_EXEC_COUNT -> ApiUsageRecordKeyProto.JS_EXEC_COUNT;
case TBEL_EXEC_COUNT -> ApiUsageRecordKeyProto.TBEL_EXEC_COUNT;
case EMAIL_EXEC_COUNT -> ApiUsageRecordKeyProto.EMAIL_EXEC_COUNT;
case SMS_EXEC_COUNT -> ApiUsageRecordKeyProto.SMS_EXEC_COUNT;
case CREATED_ALARMS_COUNT -> ApiUsageRecordKeyProto.CREATED_ALARMS_COUNT;
case ACTIVE_DEVICES -> ApiUsageRecordKeyProto.ACTIVE_DEVICES;
case INACTIVE_DEVICES -> ApiUsageRecordKeyProto.INACTIVE_DEVICES;
};
}
public static ApiUsageRecordKey fromProto(ApiUsageRecordKeyProto proto) {
return switch (proto) {
case UNRECOGNIZED -> null;
case TRANSPORT_MSG_COUNT -> ApiUsageRecordKey.TRANSPORT_MSG_COUNT;
case TRANSPORT_DP_COUNT -> ApiUsageRecordKey.TRANSPORT_DP_COUNT;
case STORAGE_DP_COUNT -> ApiUsageRecordKey.STORAGE_DP_COUNT;
case RE_EXEC_COUNT -> ApiUsageRecordKey.RE_EXEC_COUNT;
case JS_EXEC_COUNT -> ApiUsageRecordKey.JS_EXEC_COUNT;
case TBEL_EXEC_COUNT -> ApiUsageRecordKey.TBEL_EXEC_COUNT;
case EMAIL_EXEC_COUNT -> ApiUsageRecordKey.EMAIL_EXEC_COUNT;
case SMS_EXEC_COUNT -> ApiUsageRecordKey.SMS_EXEC_COUNT;
case CREATED_ALARMS_COUNT -> ApiUsageRecordKey.CREATED_ALARMS_COUNT;
case ACTIVE_DEVICES -> ApiUsageRecordKey.ACTIVE_DEVICES;
case INACTIVE_DEVICES -> ApiUsageRecordKey.INACTIVE_DEVICES;
};
}
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) {
return new DeviceAttributesEventNotificationMsg(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),

View File

@ -61,6 +61,20 @@ enum EntityTypeProto {
MOBILE_APP_BUNDLE = 38;
}
enum ApiUsageRecordKeyProto {
TRANSPORT_MSG_COUNT = 0;
TRANSPORT_DP_COUNT = 1;
STORAGE_DP_COUNT = 2;
RE_EXEC_COUNT = 3;
JS_EXEC_COUNT = 4;
TBEL_EXEC_COUNT = 5;
EMAIL_EXEC_COUNT = 6;
SMS_EXEC_COUNT = 7;
CREATED_ALARMS_COUNT = 8;
ACTIVE_DEVICES = 9;
INACTIVE_DEVICES = 10;
}
/**
* Service Discovery Data Structures;
*/
@ -1588,8 +1602,8 @@ message ToTransportMsg {
repeated QueueDeleteMsg queueDeleteMsgs = 16;
}
message UsageStatsKVProto{
string key = 1;
message UsageStatsKVProto {
ApiUsageRecordKeyProto key = 1;
int64 value = 2;
}
@ -1604,6 +1618,10 @@ message ToUsageStatsServiceMsg {
string serviceId = 8;
}
message ToUsageStatsServiceMsgPack {
repeated ToUsageStatsServiceMsg msgs = 1;
}
message ToOtaPackageStateServiceMsg {
int64 ts = 1;
int64 tenantIdMSB = 2;

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -211,14 +211,14 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override

View File

@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -189,14 +189,14 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override

View File

@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
@ -172,7 +172,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -62,7 +62,7 @@ public class AwsSqsTbVersionControlQueueFactory implements TbVersionControlQueue
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -126,7 +126,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -140,7 +140,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}
@ -155,7 +155,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -116,7 +116,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -320,13 +320,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("monolith-us-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
@ -356,8 +356,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -269,13 +269,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-us-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
@ -305,8 +305,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
@ -274,8 +274,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -165,8 +165,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("transport-node-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -20,7 +20,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
@ -98,8 +98,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-vc-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -211,9 +211,9 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -228,7 +228,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -181,9 +181,9 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -198,7 +198,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
@ -177,7 +177,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -62,7 +62,7 @@ public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueue
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -126,7 +126,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -209,9 +209,9 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -226,7 +226,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -211,9 +211,9 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -228,7 +228,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
@ -170,7 +170,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -62,7 +62,7 @@ public class RabbitMqTbVersionControlQueueFactory implements TbVersionControlQue
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -127,7 +127,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -208,9 +208,9 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -225,7 +225,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -181,9 +181,9 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
@ -198,7 +198,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
@ -170,7 +170,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -62,7 +62,7 @@ public class ServiceBusTbVersionControlQueueFactory implements TbVersionControlQ
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -128,7 +128,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -91,7 +91,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous
*
* @return
*/
TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer();
TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer();
/**
* Used to consume messages about firmware update notifications by TB Core Service

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -45,7 +45,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> toEdgeEvents;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToVersionControlServiceMsg>> toVersionControl;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
@ -94,7 +94,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -74,7 +74,7 @@ public interface TbQueueProducerProvider {
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer();
/**
* Used to push messages to other instances of TB Core Service

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -42,7 +42,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications;
@ -107,7 +107,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -40,7 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) {
@ -102,7 +102,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}

View File

@ -15,12 +15,12 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface TbUsageStatsClientQueueFactory {
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer();
}

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -38,7 +38,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider
private final TbVersionControlQueueFactory tbQueueProvider;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
public TbVersionControlProducerProvider(TbVersionControlQueueFactory tbQueueProvider) {
@ -98,7 +98,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}

View File

@ -29,7 +29,10 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -38,13 +41,18 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@Component
@Slf4j
@ -57,6 +65,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
private boolean enabledPerCustomer;
@Value("${usage.stats.report.interval:10}")
private int interval;
@Value("${usage.stats.report.pack_size:1024}")
private int packSize;
private final EnumMap<ApiUsageRecordKey, ConcurrentMap<ReportLevel, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
@ -64,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
private final TbServiceInfoProvider serviceInfoProvider;
private final SchedulerComponent scheduler;
private final TbQueueProducerProvider producerProvider;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgProducer;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> msgProducer;
@PostConstruct
private void init() {
@ -110,31 +120,50 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
});
UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder()
.setKey(key.name())
.setKey(ProtoUtils.toProto(key))
.setValue(value);
statsMsg.addValues(statsItem.build());
});
statsForKey.clear();
}
report.forEach(((parent, statsMsg) -> {
//TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
Map<TopicPartitionInfo, List<ToUsageStatsServiceMsg>> reportStatsPerTpi = new HashMap<>();
report.forEach((parent, statsMsg) -> {
try {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
.newByTopic(msgProducer.getDefaultTopic());
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build());
} catch (TenantNotFoundException e) {
log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId());
} catch (Exception e) {
log.warn("Failed to report usage stats for tenant {}", parent.getTenantId(), e);
}
}));
});
reportStatsPerTpi.forEach((tpi, statsList) -> {
toMsgPack(statsList).forEach(pack -> {
try {
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), pack), null);
} catch (Exception e) {
log.warn("Failed to report usage stats pack to TPI {}", tpi, e);
}
});
});
if (!report.isEmpty()) {
log.debug("Reporting API usage statistics for {} tenants and customers", report.size());
}
}
private List<ToUsageStatsServiceMsgPack> toMsgPack(List<ToUsageStatsServiceMsg> list) {
return IntStream.range(0, (list.size() + packSize - 1) / packSize)
.mapToObj(i -> {
var packList = list.subList(i * packSize, Math.min((i + 1) * packSize, list.size()));
var pack = ToUsageStatsServiceMsgPack.newBuilder();
pack.addAllMsgs(packList);
return pack.build();
}).toList();
}
@Override
public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
if (!enabled) return;

View File

@ -289,6 +289,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics:

View File

@ -478,6 +478,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics:

View File

@ -448,6 +448,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics:

View File

@ -548,6 +548,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics:

View File

@ -481,6 +481,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics:

View File

@ -434,6 +434,8 @@ usage:
enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}"
# Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
# Amount of statistic messages in pack
pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}"
# Metrics parameters
metrics: