diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index af4da51a45..74926297aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -61,7 +61,8 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -155,9 +156,30 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } @Override - public void process(TbProtoQueueMsg msgPack, TbCallback callback) { - String serviceId = msgPack.getValue().getServiceId(); - msgPack.getValue().getMsgsList().forEach(msg -> { + public void process(TbProtoQueueMsg msgPack, TbCallback callback) { + ToUsageStatsServiceMsg serviceMsg = msgPack.getValue(); + String serviceId = serviceMsg.getServiceId(); + + List msgs; + + //For backward compatibility, remove after release + if (serviceMsg.getTenantIdMSB() != 0) { + TransportProtos.UsageStatsServiceMsg oldMsg = TransportProtos.UsageStatsServiceMsg.newBuilder() + .setTenantIdMSB(serviceMsg.getTenantIdMSB()) + .setTenantIdLSB(serviceMsg.getTenantIdLSB()) + .setCustomerIdMSB(serviceMsg.getCustomerIdMSB()) + .setCustomerIdLSB(serviceMsg.getCustomerIdLSB()) + .setEntityIdMSB(serviceMsg.getEntityIdMSB()) + .setEntityIdLSB(serviceMsg.getEntityIdLSB()) + .addAllValues(serviceMsg.getValuesList()) + .build(); + + msgs = List.of(oldMsg); + } else { + msgs = serviceMsg.getMsgsList(); + } + + msgs.forEach(msg -> { TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); EntityId ownerId; if (msg.getCustomerIdMSB() != 0 && msg.getCustomerIdLSB() != 0) { @@ -193,7 +215,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); Set apiFeatures = new HashSet<>(); for (UsageStatsKVProto statsItem : values) { - ApiUsageRecordKey recordKey = ProtoUtils.fromProto(statsItem.getKey()); + ApiUsageRecordKey recordKey; + + //For backward compatibility, remove after release + if (StringUtils.isNotEmpty(statsItem.getKey())) { + recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey()); + } else { + recordKey = ProtoUtils.fromProto(statsItem.getKeyProto()); + } StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId); if (calculationResult.isValueChanged()) { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index 27673ccc92..5b5b1b5225 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -22,13 +22,13 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.stats.TbApiUsageStateClient; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngineApiUsageStateService, ApplicationListener { - void process(TbProtoQueueMsg msg, TbCallback callback); + void process(TbProtoQueueMsg msg, TbCallback callback); void onTenantProfileUpdate(TenantProfileId tenantProfileId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 6d4a108107..3aaa36b068 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -74,7 +74,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -150,7 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig> mainConsumer; - private QueueConsumerManager> usageStatsConsumer; + private QueueConsumerManager> usageStatsConsumer; private QueueConsumerManager> firmwareStatesConsumer; private volatile ListeningExecutorService deviceActivityEventsExecutor; @@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService>builder() + this.usageStatsConsumer = QueueConsumerManager.>builder() .name("TB Usage Stats") .msgPackProcessor(this::processUsageStatsMsg) .pollInterval(pollInterval) @@ -402,11 +402,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> msgs, TbQueueConsumer> consumer) throws Exception { - ConcurrentMap> pendingMap = msgs.stream().collect( + private void processUsageStatsMsg(List> msgs, TbQueueConsumer> consumer) throws Exception { + ConcurrentMap> pendingMap = msgs.stream().collect( Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); CountDownLatch processingTimeoutLatch = new CountDownLatch(1); - TbPackProcessingContext> ctx = new TbPackProcessingContext<>( + TbPackProcessingContext> ctx = new TbPackProcessingContext<>( processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); pendingMap.forEach((id, msg) -> { log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue()); @@ -453,7 +453,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService msg, TbCallback callback) { + private void handleUsageStats(TbProtoQueueMsg msg, TbCallback callback) { statsService.process(msg, callback); } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 0e318c3b30..51796d388e 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1603,11 +1603,24 @@ message ToTransportMsg { } message UsageStatsKVProto { - ApiUsageRecordKeyProto key = 1; + string key = 1 [deprecated=true]; int64 value = 2; + ApiUsageRecordKeyProto keyProto = 3; } message ToUsageStatsServiceMsg { + int64 tenantIdMSB = 1 [deprecated=true]; + int64 tenantIdLSB = 2 [deprecated=true]; + int64 entityIdMSB = 3 [deprecated=true]; + int64 entityIdLSB = 4 [deprecated=true]; + repeated UsageStatsKVProto values = 5 [deprecated=true]; + int64 customerIdMSB = 6 [deprecated=true]; + int64 customerIdLSB = 7 [deprecated=true]; + string serviceId = 8; + repeated UsageStatsServiceMsg msgs = 9; +} + +message UsageStatsServiceMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; int64 entityIdMSB = 3; @@ -1617,11 +1630,6 @@ message ToUsageStatsServiceMsg { int64 customerIdLSB = 7; } -message ToUsageStatsServiceMsgPack { - repeated ToUsageStatsServiceMsg msgs = 1; - string serviceId = 2; -} - message ToOtaPackageStateServiceMsg { int64 ts = 1; int64 tenantIdMSB = 2; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index d4ca701b51..d70cad159b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -140,7 +140,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @@ -155,7 +155,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 2d8e124172..4ee79aef60 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -116,7 +116,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 18d1a3af00..dd5d61e834 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -35,7 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -320,13 +320,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("monolith-us-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); @@ -356,8 +356,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("monolith-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index b6a2e252ff..cc0e044917 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -269,13 +269,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("tb-core-us-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); @@ -305,8 +305,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-core-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 68f5fee9c2..87a1a69c2e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -274,8 +274,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-rule-engine-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index daa78a08f6..dd260840f5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -165,8 +165,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("transport-node-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java index 7e6c05dcae..aacebb8579 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java @@ -20,7 +20,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; @@ -98,8 +98,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-vc-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index b9ccfd1866..c4002f4d3e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -28,7 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -91,7 +91,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous * * @return */ - TbQueueConsumer> createToUsageStatsServiceMsgConsumer(); + TbQueueConsumer> createToUsageStatsServiceMsgConsumer(); /** * Used to consume messages about firmware update notifications by TB Core Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java index 386a525b28..9cf18e6cb4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java @@ -26,7 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -45,7 +45,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toEdge; private TbQueueProducer> toEdgeNotifications; private TbQueueProducer> toEdgeEvents; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toVersionControl; private TbQueueProducer> toHousekeeper; @@ -94,7 +94,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java index d5c011c7b9..ec31763baa 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -74,7 +74,7 @@ public interface TbQueueProducerProvider { * * @return */ - TbQueueProducer> getTbUsageStatsMsgProducer(); + TbQueueProducer> getTbUsageStatsMsgProducer(); /** * Used to push messages to other instances of TB Core Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java index 835b02ec8e..e9f7773a26 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -42,7 +42,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toTbCore; private TbQueueProducer> toRuleEngineNotifications; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; private TbQueueProducer> toEdge; private TbQueueProducer> toEdgeNotifications; @@ -107,7 +107,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java index 689d236de7..7960cbcf32 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -40,7 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider private TbQueueProducer> toRuleEngine; private TbQueueProducer> toTbCore; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) { @@ -102,7 +102,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java index c4c5f5215a..0c551fc998 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java @@ -15,12 +15,12 @@ */ package org.thingsboard.server.queue.provider; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface TbUsageStatsClientQueueFactory { - TbQueueProducer> createToUsageStatsServiceMsgProducer(); + TbQueueProducer> createToUsageStatsServiceMsgProducer(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java index 400ccfc085..cd4fa12df0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -38,7 +38,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider private final TbVersionControlQueueFactory tbQueueProvider; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; public TbVersionControlProducerProvider(TbVersionControlQueueFactory tbQueueProvider) { @@ -98,7 +98,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java index 8c0439362b..d26042d114 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java @@ -32,8 +32,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -74,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { private final TbServiceInfoProvider serviceInfoProvider; private final SchedulerComponent scheduler; private final TbQueueProducerProvider producerProvider; - private TbQueueProducer> msgProducer; + private TbQueueProducer> msgProducer; @PostConstruct private void init() { @@ -94,7 +94,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { } private void reportStats() { - ConcurrentMap report = new ConcurrentHashMap<>(); + ConcurrentMap report = new ConcurrentHashMap<>(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { ConcurrentMap statsForKey = stats.get(key); @@ -102,8 +102,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { long value = statsValue.get(); if (value == 0 && key.isCounter()) return; - ToUsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> { - ToUsageStatsServiceMsg.Builder newStatsMsg = ToUsageStatsServiceMsg.newBuilder(); + UsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> { + UsageStatsServiceMsg.Builder newStatsMsg = UsageStatsServiceMsg.newBuilder(); TenantId tenantId = parent.getTenantId(); newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); @@ -119,13 +119,13 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { }); UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder() - .setKey(ProtoUtils.toProto(key)) + .setKeyProto(ProtoUtils.toProto(key)) .setValue(value); statsMsg.addValues(statsItem.build()); }); } - Map> reportStatsPerTpi = new HashMap<>(); + Map> reportStatsPerTpi = new HashMap<>(); report.forEach((parent, statsMsg) -> { try { @@ -152,11 +152,11 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { } } - private List toMsgPack(List list) { + private List toMsgPack(List list) { return Lists.partition(list, packSize) .stream() .map(partition -> - ToUsageStatsServiceMsgPack.newBuilder() + ToUsageStatsServiceMsg.newBuilder() .addAllMsgs(partition) .setServiceId(serviceInfoProvider.getServiceId()) .build())