added usage stats consumer and producer to queue providers

This commit is contained in:
YevhenBondarenko 2020-10-20 13:59:14 +03:00
parent cc2446442d
commit 70fe28a5ac
7 changed files with 81 additions and 47 deletions

View File

@ -30,28 +30,29 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@ -62,7 +63,6 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -143,6 +143,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
log.info("Subscribing to partitions: {}", partitionChangeEvent.getPartitions());
this.mainConsumer.subscribe(partitionChangeEvent.getPartitions());
this.usageStatsConsumer.subscribe(
partitionChangeEvent
.getPartitions()
.stream()
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
.collect(Collectors.toSet()));
}
}

View File

@ -50,6 +50,10 @@ public class TopicPartitionInfo {
this.fullTopicName = tmp;
}
public TopicPartitionInfo newByTopic(String topic) {
return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.myPartition);
}
public String getTopic() {
return topic;
}

View File

@ -17,7 +17,12 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -28,11 +33,12 @@ import javax.annotation.PostConstruct;
public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
private final TbCoreQueueFactory tbQueueProvider;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> toTransport;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransport;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {
this.tbQueueProvider = tbQueueProvider;
@ -45,30 +51,36 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
this.toRuleEngine = tbQueueProvider.createRuleEngineMsgProducer();
this.toRuleEngineNotifications = tbQueueProvider.createRuleEngineNotificationsMsgProducer();
this.toTbCoreNotifications = tbQueueProvider.createTbCoreNotificationsMsgProducer();
this.toUsageStats = tbQueueProvider.createToUsageStatsServiceMsgProducer();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
return toTransport;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return toRuleEngine;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
return toRuleEngineNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
return toTbCore;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
return toTbCoreNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}
}

View File

@ -15,12 +15,12 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -69,9 +69,5 @@ public interface TbQueueProducerProvider {
*
* @return
*/
default TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
//TODO: implement
return null;
}
TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer();
}

View File

@ -17,7 +17,12 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -28,12 +33,12 @@ import javax.annotation.PostConstruct;
public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
private final TbRuleEngineQueueFactory tbQueueProvider;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> toTransport;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransport;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toTbCoreNotifications;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
public TbRuleEngineProducerProvider(TbRuleEngineQueueFactory tbQueueProvider) {
this.tbQueueProvider = tbQueueProvider;
@ -46,30 +51,36 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
this.toRuleEngine = tbQueueProvider.createRuleEngineMsgProducer();
this.toRuleEngineNotifications = tbQueueProvider.createRuleEngineNotificationsMsgProducer();
this.toTbCoreNotifications = tbQueueProvider.createTbCoreNotificationsMsgProducer();
this.toUsageStats = tbQueueProvider.createToUsageStatsServiceMsgProducer();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
return toTransport;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return toRuleEngine;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
return toRuleEngineNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
return toTbCore;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
return toTbCoreNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}
}

View File

@ -17,7 +17,12 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -28,9 +33,9 @@ import javax.annotation.PostConstruct;
public class TbTransportQueueProducerProvider implements TbQueueProducerProvider {
private final TbTransportQueueFactory tbQueueProvider;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> toUsageStats;
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> toRuleEngine;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toTbCore;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) {
this.tbQueueProvider = tbQueueProvider;
@ -44,32 +49,32 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
throw new RuntimeException("Not Implemented! Should not be used by Transport!");
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return toRuleEngine;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
return toTbCore;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
throw new RuntimeException("Not Implemented! Should not be used by Transport!");
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
throw new RuntimeException("Not Implemented! Should not be used by Transport!");
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
return toUsageStats;
}
}

View File

@ -91,7 +91,7 @@ public class DefaultTbUsageStatsClient implements TbUsageStatsClient {
report.forEach(((tenantId, builder) -> {
//TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).newByTopic(msgProducer.getDefaultTopic());
msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null);
}));
log.info("Report statistics for: {} tenants", report.size());