added backward compatibility for api usage

This commit is contained in:
YevhenBondarenko 2025-03-10 12:06:34 +01:00
parent 89900a6dee
commit 28fba7645e
19 changed files with 108 additions and 71 deletions

View File

@ -61,7 +61,8 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
@ -155,9 +156,30 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
} }
@Override @Override
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msgPack, TbCallback callback) { public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msgPack, TbCallback callback) {
String serviceId = msgPack.getValue().getServiceId(); ToUsageStatsServiceMsg serviceMsg = msgPack.getValue();
msgPack.getValue().getMsgsList().forEach(msg -> { String serviceId = serviceMsg.getServiceId();
List<TransportProtos.UsageStatsServiceMsg> msgs;
//For backward compatibility, remove after release
if (serviceMsg.getTenantIdMSB() != 0) {
TransportProtos.UsageStatsServiceMsg oldMsg = TransportProtos.UsageStatsServiceMsg.newBuilder()
.setTenantIdMSB(serviceMsg.getTenantIdMSB())
.setTenantIdLSB(serviceMsg.getTenantIdLSB())
.setCustomerIdMSB(serviceMsg.getCustomerIdMSB())
.setCustomerIdLSB(serviceMsg.getCustomerIdLSB())
.setEntityIdMSB(serviceMsg.getEntityIdMSB())
.setEntityIdLSB(serviceMsg.getEntityIdLSB())
.addAllValues(serviceMsg.getValuesList())
.build();
msgs = List.of(oldMsg);
} else {
msgs = serviceMsg.getMsgsList();
}
msgs.forEach(msg -> {
TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
EntityId ownerId; EntityId ownerId;
if (msg.getCustomerIdMSB() != 0 && msg.getCustomerIdLSB() != 0) { if (msg.getCustomerIdMSB() != 0 && msg.getCustomerIdLSB() != 0) {
@ -193,7 +215,14 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
Set<ApiFeature> apiFeatures = new HashSet<>(); Set<ApiFeature> apiFeatures = new HashSet<>();
for (UsageStatsKVProto statsItem : values) { for (UsageStatsKVProto statsItem : values) {
ApiUsageRecordKey recordKey = ProtoUtils.fromProto(statsItem.getKey()); ApiUsageRecordKey recordKey;
//For backward compatibility, remove after release
if (StringUtils.isNotEmpty(statsItem.getKey())) {
recordKey = ApiUsageRecordKey.valueOf(statsItem.getKey());
} else {
recordKey = ProtoUtils.fromProto(statsItem.getKeyProto());
}
StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId); StatsCalculationResult calculationResult = usageState.calculate(recordKey, statsItem.getValue(), serviceId);
if (calculationResult.isValueChanged()) { if (calculationResult.isValueChanged()) {

View File

@ -22,13 +22,13 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.TbApiUsageStateClient; import org.thingsboard.server.common.stats.TbApiUsageStateClient;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngineApiUsageStateService, ApplicationListener<PartitionChangeEvent> { public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngineApiUsageStateService, ApplicationListener<PartitionChangeEvent> {
void process(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msg, TbCallback callback); void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback);
void onTenantProfileUpdate(TenantProfileId tenantProfileId); void onTenantProfileUpdate(TenantProfileId tenantProfileId);

View File

@ -74,7 +74,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -150,7 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbCoreConsumerStats stats; private final TbCoreConsumerStats stats;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer; private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> usageStatsConsumer; private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer; private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
private volatile ListeningExecutorService deviceActivityEventsExecutor; private volatile ListeningExecutorService deviceActivityEventsExecutor;
@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
.scheduler(scheduler) .scheduler(scheduler)
.taskExecutor(mgmtExecutor) .taskExecutor(mgmtExecutor)
.build(); .build();
this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>>builder() this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder()
.name("TB Usage Stats") .name("TB Usage Stats")
.msgPackProcessor(this::processUsageStatsMsg) .msgPackProcessor(this::processUsageStatsMsg)
.pollInterval(pollInterval) .pollInterval(pollInterval)
@ -402,11 +402,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} }
} }
private void processUsageStatsMsg(List<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumer) throws Exception { private void processUsageStatsMsg(List<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumer) throws Exception {
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> pendingMap = msgs.stream().collect( ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1); CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> ctx = new TbPackProcessingContext<>( TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
pendingMap.forEach((id, msg) -> { pendingMap.forEach((id, msg) -> {
log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue()); log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue());
@ -453,7 +453,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
consumer.commit(); consumer.commit();
} }
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsgPack> msg, TbCallback callback) { private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
statsService.process(msg, callback); statsService.process(msg, callback);
} }

View File

@ -1603,11 +1603,24 @@ message ToTransportMsg {
} }
message UsageStatsKVProto { message UsageStatsKVProto {
ApiUsageRecordKeyProto key = 1; string key = 1 [deprecated=true];
int64 value = 2; int64 value = 2;
ApiUsageRecordKeyProto keyProto = 3;
} }
message ToUsageStatsServiceMsg { message ToUsageStatsServiceMsg {
int64 tenantIdMSB = 1 [deprecated=true];
int64 tenantIdLSB = 2 [deprecated=true];
int64 entityIdMSB = 3 [deprecated=true];
int64 entityIdLSB = 4 [deprecated=true];
repeated UsageStatsKVProto values = 5 [deprecated=true];
int64 customerIdMSB = 6 [deprecated=true];
int64 customerIdLSB = 7 [deprecated=true];
string serviceId = 8;
repeated UsageStatsServiceMsg msgs = 9;
}
message UsageStatsServiceMsg {
int64 tenantIdMSB = 1; int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2; int64 tenantIdLSB = 2;
int64 entityIdMSB = 3; int64 entityIdMSB = 3;
@ -1617,11 +1630,6 @@ message ToUsageStatsServiceMsg {
int64 customerIdLSB = 7; int64 customerIdLSB = 7;
} }
message ToUsageStatsServiceMsgPack {
repeated ToUsageStatsServiceMsg msgs = 1;
string serviceId = 2;
}
message ToOtaPackageStateServiceMsg { message ToOtaPackageStateServiceMsg {
int64 ts = 1; int64 ts = 1;
int64 tenantIdMSB = 2; int64 tenantIdMSB = 2;

View File

@ -140,7 +140,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
} }
@ -155,7 +155,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
} }

View File

@ -116,7 +116,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic())); return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
} }

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -320,13 +320,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumerBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings); consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("monolith-us-consumer")); consumerBuilder.groupId(topicService.buildTopicName("monolith-us-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin); consumerBuilder.admin(coreAdmin);
consumerBuilder.statsService(consumerStatsService); consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build(); return consumerBuilder.build();
@ -356,8 +356,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("monolith-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -269,13 +269,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> consumerBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings); consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); consumerBuilder.topic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-us-consumer")); consumerBuilder.groupId(topicService.buildTopicName("tb-core-us-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsgPack.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin); consumerBuilder.admin(coreAdmin);
consumerBuilder.statsService(consumerStatsService); consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build(); return consumerBuilder.build();
@ -305,8 +305,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("tb-core-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
@ -274,8 +274,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("tb-rule-engine-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
@ -165,8 +165,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("transport-node-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("transport-node-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -20,7 +20,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
@ -98,8 +98,8 @@ public class KafkaTbVersionControlQueueFactory implements TbVersionControlQueueF
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-vc-us-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("tb-vc-us-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic())); requestBuilder.defaultTopic(topicService.buildTopicName(coreSettings.getUsageStatsTopic()));

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@ -91,7 +91,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous
* *
* @return * @return
*/ */
TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgConsumer(); TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer();
/** /**
* Used to consume messages about firmware update notifications by TB Core Service * Used to consume messages about firmware update notifications by TB Core Service

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -45,7 +45,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge; private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications; private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> toEdgeEvents; private TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> toEdgeEvents;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats; private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToVersionControlServiceMsg>> toVersionControl; private TbQueueProducer<TbProtoQueueMsg<ToVersionControlServiceMsg>> toVersionControl;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper; private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
@ -94,7 +94,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats; return toUsageStats;
} }

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -74,7 +74,7 @@ public interface TbQueueProducerProvider {
* *
* @return * @return
*/ */
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer(); TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer();
/** /**
* Used to push messages to other instances of TB Core Service * Used to push messages to other instances of TB Core Service

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -42,7 +42,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore; private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineNotifications; private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications; private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats; private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper; private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge; private TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> toEdge;
private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications; private TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> toEdgeNotifications;
@ -107,7 +107,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats; return toUsageStats;
} }

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -40,7 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine; private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore; private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications; private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats; private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper; private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) { public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) {
@ -102,7 +102,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats; return toUsageStats;
} }

View File

@ -15,12 +15,12 @@
*/ */
package org.thingsboard.server.queue.provider; package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface TbUsageStatsClientQueueFactory { public interface TbUsageStatsClientQueueFactory {
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> createToUsageStatsServiceMsgProducer(); TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer();
} }

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -38,7 +38,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider
private final TbVersionControlQueueFactory tbQueueProvider; private final TbVersionControlQueueFactory tbQueueProvider;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications; private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> toUsageStats; private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper; private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
public TbVersionControlProducerProvider(TbVersionControlQueueFactory tbQueueProvider) { public TbVersionControlProducerProvider(TbVersionControlQueueFactory tbQueueProvider) {
@ -98,7 +98,7 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsgPack>> getTbUsageStatsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats; return toUsageStats;
} }

View File

@ -32,8 +32,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsgPack;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -74,7 +74,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final TbQueueProducerProvider producerProvider; private final TbQueueProducerProvider producerProvider;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsgPack>> msgProducer; private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> msgProducer;
@PostConstruct @PostConstruct
private void init() { private void init() {
@ -94,7 +94,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
} }
private void reportStats() { private void reportStats() {
ConcurrentMap<ParentEntity, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>(); ConcurrentMap<ParentEntity, UsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key); ConcurrentMap<ReportLevel, AtomicLong> statsForKey = stats.get(key);
@ -102,8 +102,8 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
long value = statsValue.get(); long value = statsValue.get();
if (value == 0 && key.isCounter()) return; if (value == 0 && key.isCounter()) return;
ToUsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> { UsageStatsServiceMsg.Builder statsMsg = report.computeIfAbsent(reportLevel.getParentEntity(), parent -> {
ToUsageStatsServiceMsg.Builder newStatsMsg = ToUsageStatsServiceMsg.newBuilder(); UsageStatsServiceMsg.Builder newStatsMsg = UsageStatsServiceMsg.newBuilder();
TenantId tenantId = parent.getTenantId(); TenantId tenantId = parent.getTenantId();
newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); newStatsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
@ -119,13 +119,13 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
}); });
UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder() UsageStatsKVProto.Builder statsItem = UsageStatsKVProto.newBuilder()
.setKey(ProtoUtils.toProto(key)) .setKeyProto(ProtoUtils.toProto(key))
.setValue(value); .setValue(value);
statsMsg.addValues(statsItem.build()); statsMsg.addValues(statsItem.build());
}); });
} }
Map<TopicPartitionInfo, List<ToUsageStatsServiceMsg>> reportStatsPerTpi = new HashMap<>(); Map<TopicPartitionInfo, List<UsageStatsServiceMsg>> reportStatsPerTpi = new HashMap<>();
report.forEach((parent, statsMsg) -> { report.forEach((parent, statsMsg) -> {
try { try {
@ -152,11 +152,11 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
} }
} }
private List<ToUsageStatsServiceMsgPack> toMsgPack(List<ToUsageStatsServiceMsg> list) { private List<ToUsageStatsServiceMsg> toMsgPack(List<UsageStatsServiceMsg> list) {
return Lists.partition(list, packSize) return Lists.partition(list, packSize)
.stream() .stream()
.map(partition -> .map(partition ->
ToUsageStatsServiceMsgPack.newBuilder() ToUsageStatsServiceMsg.newBuilder()
.addAllMsgs(partition) .addAllMsgs(partition)
.setServiceId(serviceInfoProvider.getServiceId()) .setServiceId(serviceInfoProvider.getServiceId())
.build()) .build())