From eaafdf1f874d6ad6b81257841b24f4e87d34f696 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 4 Dec 2024 17:05:37 +0100 Subject: [PATCH] implemented sending api usages in pack --- .../DefaultTbApiUsageStateService.java | 12 +++-- .../apiusage/TbApiUsageStateService.java | 4 +- .../queue/DefaultTbCoreConsumerService.java | 14 +++--- .../src/main/resources/thingsboard.yml | 2 + .../server/common/util/ProtoUtils.java | 35 +++++++++++++++ common/proto/src/main/proto/queue.proto | 22 ++++++++- .../provider/AwsSqsMonolithQueueFactory.java | 8 ++-- .../provider/AwsSqsTbCoreQueueFactory.java | 8 ++-- .../AwsSqsTbRuleEngineQueueFactory.java | 4 +- .../AwsSqsTbVersionControlQueueFactory.java | 2 +- .../provider/AwsSqsTransportQueueFactory.java | 2 +- .../InMemoryMonolithQueueFactory.java | 4 +- .../InMemoryTbTransportQueueFactory.java | 2 +- .../provider/KafkaMonolithQueueFactory.java | 12 ++--- .../provider/KafkaTbCoreQueueFactory.java | 12 ++--- .../KafkaTbRuleEngineQueueFactory.java | 6 +-- .../KafkaTbTransportQueueFactory.java | 6 +-- .../KafkaTbVersionControlQueueFactory.java | 6 +-- .../provider/PubSubMonolithQueueFactory.java | 8 ++-- .../provider/PubSubTbCoreQueueFactory.java | 8 ++-- .../PubSubTbRuleEngineQueueFactory.java | 4 +- .../PubSubTbVersionControlQueueFactory.java | 2 +- .../provider/PubSubTransportQueueFactory.java | 4 +- .../RabbitMqMonolithQueueFactory.java | 8 ++-- .../provider/RabbitMqTbCoreQueueFactory.java | 8 ++-- .../RabbitMqTbRuleEngineQueueFactory.java | 4 +- .../RabbitMqTbVersionControlQueueFactory.java | 2 +- .../RabbitMqTransportQueueFactory.java | 4 +- .../ServiceBusMonolithQueueFactory.java | 8 ++-- .../ServiceBusTbCoreQueueFactory.java | 8 ++-- .../ServiceBusTbRuleEngineQueueFactory.java | 4 +- ...erviceBusTbVersionControlQueueFactory.java | 2 +- .../ServiceBusTransportQueueFactory.java | 4 +- .../queue/provider/TbCoreQueueFactory.java | 4 +- .../provider/TbCoreQueueProducerProvider.java | 6 +-- .../provider/TbQueueProducerProvider.java | 4 +- .../TbRuleEngineProducerProvider.java | 6 +-- .../TbTransportQueueProducerProvider.java | 6 +-- .../TbUsageStatsClientQueueFactory.java | 4 +- .../TbVersionControlProducerProvider.java | 6 +-- .../DefaultTbApiUsageReportClient.java | 45 +++++++++++++++---- .../src/main/resources/tb-vc-executor.yml | 2 + .../src/main/resources/tb-coap-transport.yml | 2 + .../src/main/resources/tb-http-transport.yml | 2 + .../src/main/resources/tb-lwm2m-transport.yml | 2 + .../src/main/resources/tb-mqtt-transport.yml | 2 + .../src/main/resources/tb-snmp-transport.yml | 2 + 47 files changed, 216 insertions(+), 116 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 60bf529ace..82413013a3 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -55,11 +55,13 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.tools.SchedulerUtils; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -153,9 +155,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } @Override - public void process(TbProtoQueueMsg msg, TbCallback callback) { - ToUsageStatsServiceMsg statsMsg = msg.getValue(); + public void process(TbProtoQueueMsg msgPack, TbCallback callback) { + msgPack.getValue().getMsgsList().forEach(this::process); + callback.onSuccess(); + } + private void process(ToUsageStatsServiceMsg statsMsg) { TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); EntityId ownerId; if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { @@ -165,7 +170,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } processEntityUsageStats(tenantId, ownerId, statsMsg.getValuesList(), statsMsg.getServiceId()); - callback.onSuccess(); } private void processEntityUsageStats(TenantId tenantId, EntityId ownerId, List values, String serviceId) { @@ -190,7 +194,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); Set apiFeatures = new HashSet<>(); for (UsageStatsKVProto statsItem : values) { - ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey()); + ApiUsageRecordKey recordKey = ProtoUtils.fromProto(statsItem.getKey()); StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId); if (calculationResult.isValueChanged()) { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index 5b5b1b5225..27673ccc92 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -22,13 +22,13 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.stats.TbApiUsageStateClient; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngineApiUsageStateService, ApplicationListener { - void process(TbProtoQueueMsg msg, TbCallback callback); + void process(TbProtoQueueMsg msg, TbCallback callback); void onTenantProfileUpdate(TenantProfileId tenantProfileId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 3aaa36b068..6d4a108107 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -74,7 +74,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -150,7 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig> mainConsumer; - private QueueConsumerManager> usageStatsConsumer; + private QueueConsumerManager> usageStatsConsumer; private QueueConsumerManager> firmwareStatesConsumer; private volatile ListeningExecutorService deviceActivityEventsExecutor; @@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService>builder() + this.usageStatsConsumer = QueueConsumerManager.>builder() .name("TB Usage Stats") .msgPackProcessor(this::processUsageStatsMsg) .pollInterval(pollInterval) @@ -402,11 +402,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> msgs, TbQueueConsumer> consumer) throws Exception { - ConcurrentMap> pendingMap = msgs.stream().collect( + private void processUsageStatsMsg(List> msgs, TbQueueConsumer> consumer) throws Exception { + ConcurrentMap> pendingMap = msgs.stream().collect( Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); CountDownLatch processingTimeoutLatch = new CountDownLatch(1); - TbPackProcessingContext> ctx = new TbPackProcessingContext<>( + TbPackProcessingContext> ctx = new TbPackProcessingContext<>( processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); pendingMap.forEach((id, msg) -> { log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue()); @@ -453,7 +453,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService msg, TbCallback callback) { + private void handleUsageStats(TbProtoQueueMsg msg, TbCallback callback) { statsService.process(msg, callback); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f642d7fea5..163a87046d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -188,6 +188,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Statistics reporting interval, set to send summarized data every 10 seconds by default interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" check: # Interval of checking the start of the next cycle and re-enabling the blocked tenants/customers cycle: "${USAGE_STATS_CHECK_CYCLE:60000}" diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 264b23f118..62a7d266d3 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.Device; @@ -88,6 +89,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto; import java.util.ArrayList; import java.util.Arrays; @@ -399,6 +401,39 @@ public class ProtoUtils { return builder.build(); } + public static ApiUsageRecordKeyProto toProto(ApiUsageRecordKey apiUsageRecordKey) { + return switch (apiUsageRecordKey) { + case TRANSPORT_MSG_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_MSG_COUNT; + case TRANSPORT_DP_COUNT -> ApiUsageRecordKeyProto.TRANSPORT_DP_COUNT; + case STORAGE_DP_COUNT -> ApiUsageRecordKeyProto.STORAGE_DP_COUNT; + case RE_EXEC_COUNT -> ApiUsageRecordKeyProto.RE_EXEC_COUNT; + case JS_EXEC_COUNT -> ApiUsageRecordKeyProto.JS_EXEC_COUNT; + case TBEL_EXEC_COUNT -> ApiUsageRecordKeyProto.TBEL_EXEC_COUNT; + case EMAIL_EXEC_COUNT -> ApiUsageRecordKeyProto.EMAIL_EXEC_COUNT; + case SMS_EXEC_COUNT -> ApiUsageRecordKeyProto.SMS_EXEC_COUNT; + case CREATED_ALARMS_COUNT -> ApiUsageRecordKeyProto.CREATED_ALARMS_COUNT; + case ACTIVE_DEVICES -> ApiUsageRecordKeyProto.ACTIVE_DEVICES; + case INACTIVE_DEVICES -> ApiUsageRecordKeyProto.INACTIVE_DEVICES; + }; + } + + public static ApiUsageRecordKey fromProto(ApiUsageRecordKeyProto proto) { + return switch (proto) { + case UNRECOGNIZED -> null; + case TRANSPORT_MSG_COUNT -> ApiUsageRecordKey.TRANSPORT_MSG_COUNT; + case TRANSPORT_DP_COUNT -> ApiUsageRecordKey.TRANSPORT_DP_COUNT; + case STORAGE_DP_COUNT -> ApiUsageRecordKey.STORAGE_DP_COUNT; + case RE_EXEC_COUNT -> ApiUsageRecordKey.RE_EXEC_COUNT; + case JS_EXEC_COUNT -> ApiUsageRecordKey.JS_EXEC_COUNT; + case TBEL_EXEC_COUNT -> ApiUsageRecordKey.TBEL_EXEC_COUNT; + case EMAIL_EXEC_COUNT -> ApiUsageRecordKey.EMAIL_EXEC_COUNT; + case SMS_EXEC_COUNT -> ApiUsageRecordKey.SMS_EXEC_COUNT; + case CREATED_ALARMS_COUNT -> ApiUsageRecordKey.CREATED_ALARMS_COUNT; + case ACTIVE_DEVICES -> ApiUsageRecordKey.ACTIVE_DEVICES; + case INACTIVE_DEVICES -> ApiUsageRecordKey.INACTIVE_DEVICES; + }; + } + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { return new DeviceAttributesEventNotificationMsg( TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 228a4039d2..3541654b73 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -61,6 +61,20 @@ enum EntityTypeProto { MOBILE_APP_BUNDLE = 38; } +enum ApiUsageRecordKeyProto { + TRANSPORT_MSG_COUNT = 0; + TRANSPORT_DP_COUNT = 1; + STORAGE_DP_COUNT = 2; + RE_EXEC_COUNT = 3; + JS_EXEC_COUNT = 4; + TBEL_EXEC_COUNT = 5; + EMAIL_EXEC_COUNT = 6; + SMS_EXEC_COUNT = 7; + CREATED_ALARMS_COUNT = 8; + ACTIVE_DEVICES = 9; + INACTIVE_DEVICES = 10; +} + /** * Service Discovery Data Structures; */ @@ -1588,8 +1602,8 @@ message ToTransportMsg { repeated QueueDeleteMsg queueDeleteMsgs = 16; } -message UsageStatsKVProto{ - string key = 1; +message UsageStatsKVProto { + ApiUsageRecordKeyProto key = 1; int64 value = 2; } @@ -1604,6 +1618,10 @@ message ToUsageStatsServiceMsg { string serviceId = 8; } +message ToUsageStatsServiceMsgPack { + repeated ToUsageStatsServiceMsg msgs = 1; +} + message ToOtaPackageStateServiceMsg { int64 ts = 1; int64 tenantIdMSB = 2; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 310cc1d1c5..8efc66f2c1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -211,14 +211,14 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index 515c1bba44..29676deb5e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -189,14 +189,14 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java index a93aba2764..31dde9228d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java @@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -172,7 +172,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbVersionControlQueueFactory.java index 67ff0393bd..58dc8b7c88 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbVersionControlQueueFactory.java @@ -62,7 +62,7 @@ public class AwsSqsTbVersionControlQueueFactory implements TbVersionControlQueue } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java index 407ef86a99..7c649d879a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java @@ -126,7 +126,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index d70cad159b..d4ca701b51 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -140,7 +140,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } @@ -155,7 +155,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 4ee79aef60..2d8e124172 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -116,7 +116,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index dd5d61e834..18d1a3af00 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -35,7 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -320,13 +320,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("monolith-us-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); @@ -356,8 +356,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("monolith-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index cc0e044917..b6a2e252ff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -269,13 +269,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("tb-core-us-consumer")); - consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); @@ -305,8 +305,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-core-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 87a1a69c2e..68f5fee9c2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -274,8 +274,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-rule-engine-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index dd260840f5..daa78a08f6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -165,8 +165,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("transport-node-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java index aacebb8579..7e6c05dcae 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbVersionControlQueueFactory.java @@ -20,7 +20,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; @@ -98,8 +98,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-vc-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index 05bef819b7..d72fde8441 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -211,9 +211,9 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -228,7 +228,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 919d895a97..ae7c1ab7f0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -181,9 +181,9 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -198,7 +198,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 671da15f20..edf304057d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -177,7 +177,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbVersionControlQueueFactory.java index b6da4e2973..3261ad8dfd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbVersionControlQueueFactory.java @@ -62,7 +62,7 @@ public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueue } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java index a7500d2083..fa2f7793e7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -126,7 +126,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index 3c59923def..e5cdeb1741 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -209,9 +209,9 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -226,7 +226,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index 16f9838384..2fcd7936c1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -211,9 +211,9 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -228,7 +228,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java index a7af729e1f..773b1644de 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java @@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -170,7 +170,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbVersionControlQueueFactory.java index 604955be2c..56b1ea40f5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbVersionControlQueueFactory.java @@ -62,7 +62,7 @@ public class RabbitMqTbVersionControlQueueFactory implements TbVersionControlQue } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java index ea922557bd..f3bc3094a4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -127,7 +127,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index 932bbd21e5..c0e6977b35 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -208,9 +208,9 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -225,7 +225,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index 71a7efe50b..7cc1183c80 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -31,7 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -181,9 +181,9 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), - msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); } @Override @@ -198,7 +198,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java index 54661f695c..33fd286704 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -32,7 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; @@ -170,7 +170,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbVersionControlQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbVersionControlQueueFactory.java index 0c363488ce..2f02f530c1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbVersionControlQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbVersionControlQueueFactory.java @@ -62,7 +62,7 @@ public class ServiceBusTbVersionControlQueueFactory implements TbVersionControlQ } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java index 7f3e246eec..1481ec58ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -128,7 +128,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index c4002f4d3e..b9ccfd1866 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -28,7 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -91,7 +91,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous * * @return */ - TbQueueConsumer> createToUsageStatsServiceMsgConsumer(); + TbQueueConsumer> createToUsageStatsServiceMsgConsumer(); /** * Used to consume messages about firmware update notifications by TB Core Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java index 9cf18e6cb4..386a525b28 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java @@ -26,7 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -45,7 +45,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toEdge; private TbQueueProducer> toEdgeNotifications; private TbQueueProducer> toEdgeEvents; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toVersionControl; private TbQueueProducer> toHousekeeper; @@ -94,7 +94,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java index ec31763baa..d5c011c7b9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java @@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -74,7 +74,7 @@ public interface TbQueueProducerProvider { * * @return */ - TbQueueProducer> getTbUsageStatsMsgProducer(); + TbQueueProducer> getTbUsageStatsMsgProducer(); /** * Used to push messages to other instances of TB Core Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java index e9f7773a26..835b02ec8e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -42,7 +42,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toTbCore; private TbQueueProducer> toRuleEngineNotifications; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; private TbQueueProducer> toEdge; private TbQueueProducer> toEdgeNotifications; @@ -107,7 +107,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java index 7960cbcf32..689d236de7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -40,7 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider private TbQueueProducer> toRuleEngine; private TbQueueProducer> toTbCore; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) { @@ -102,7 +102,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java index 0c551fc998..c4c5f5215a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbUsageStatsClientQueueFactory.java @@ -15,12 +15,12 @@ */ package org.thingsboard.server.queue.provider; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface TbUsageStatsClientQueueFactory { - TbQueueProducer> createToUsageStatsServiceMsgProducer(); + TbQueueProducer> createToUsageStatsServiceMsgProducer(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java index cd4fa12df0..400ccfc085 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java @@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -38,7 +38,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider private final TbVersionControlQueueFactory tbQueueProvider; private TbQueueProducer> toTbCoreNotifications; - private TbQueueProducer> toUsageStats; + private TbQueueProducer> toUsageStats; private TbQueueProducer> toHousekeeper; public TbVersionControlProducerProvider(TbVersionControlQueueFactory tbQueueProvider) { @@ -98,7 +98,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider } @Override - public TbQueueProducer> getTbUsageStatsMsgProducer() { + public TbQueueProducer> getTbUsageStatsMsgProducer() { return toUsageStats; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java index e8502145ec..10ea5b13a0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageReportClient.java @@ -29,7 +29,10 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.TbApiUsageReportClient; +import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -38,13 +41,18 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; +import java.util.ArrayList; import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; @Component @Slf4j @@ -57,6 +65,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { private boolean enabledPerCustomer; @Value("${usage.stats.report.interval:10}") private int interval; + @Value("${usage.stats.report.pack_size:1024}") + private int packSize; private final EnumMap> stats = new EnumMap<>(ApiUsageRecordKey.class); @@ -64,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { private final TbServiceInfoProvider serviceInfoProvider; private final SchedulerComponent scheduler; private final TbQueueProducerProvider producerProvider; - private TbQueueProducer> msgProducer; + private TbQueueProducer> msgProducer; @PostConstruct private void init() { @@ -110,31 +120,50 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient { }); UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder() - .setKey(key.name()) + .setKey(ProtoUtils.toProto(key)) .setValue(value); statsMsg.addValues(statsItem.build()); }); statsForKey.clear(); } - report.forEach(((parent, statsMsg) -> { - //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? + Map> reportStatsPerTpi = new HashMap<>(); + + report.forEach((parent, statsMsg) -> { try { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId()) .newByTopic(msgProducer.getDefaultTopic()); - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null); + reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build()); } catch (TenantNotFoundException e) { log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId()); - } catch (Exception e) { - log.warn("Failed to report usage stats for tenant {}", parent.getTenantId(), e); } - })); + }); + + reportStatsPerTpi.forEach((tpi, statsList) -> { + toMsgPack(statsList).forEach(pack -> { + try { + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), pack), null); + } catch (Exception e) { + log.warn("Failed to report usage stats pack to TPI {}", tpi, e); + } + }); + }); if (!report.isEmpty()) { log.debug("Reporting API usage statistics for {} tenants and customers", report.size()); } } + private List toMsgPack(List list) { + return IntStream.range(0, (list.size() + packSize - 1) / packSize) + .mapToObj(i -> { + var packList = list.subList(i * packSize, Math.min((i + 1) * packSize, list.size())); + var pack = ToUsageStatsServiceMsgPack.newBuilder(); + pack.addAllMsgs(packList); + return pack.build(); + }).toList(); + } + @Override public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) { if (!enabled) return; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 6b79f2d101..32cdb6085f 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -289,6 +289,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 2c1c0550e1..23ddc2efc5 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -478,6 +478,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 3c28c6ddf6..bc0cea209e 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -448,6 +448,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index bafe77ee92..c2c0bf8e93 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -548,6 +548,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 51a2c173a7..88bd34280d 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -481,6 +481,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index df222fe09f..0e587b6028 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -434,6 +434,8 @@ usage: enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" # Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds interval: "${USAGE_STATS_REPORT_INTERVAL:10}" + # Amount of statistic messages in pack + pack_size: "${USAGE_STATS_REPORT_PACK_SIZE:1024}" # Metrics parameters metrics: