diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 5d240790f2..3f03b65416 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,20 +17,21 @@ package org.thingsboard.server.service.queue; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.dao.util.mapping.JacksonUtil; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; @@ -61,6 +62,7 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; +import org.thingsboard.server.service.usagestats.TbUsageStatsService; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -70,6 +72,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -88,32 +92,50 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> mainConsumer; private final DeviceStateService stateService; + private final TbUsageStatsService statsService; private final TbLocalSubscriptionService localSubscriptionService; private final SubscriptionManagerService subscriptionManagerService; private final TbCoreDeviceRpcService tbCoreDeviceRpcService; private final TbCoreConsumerStats stats; + protected final TbQueueConsumer> usageStatsConsumer; + + protected volatile ExecutorService usageStatsExecutor; + public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService, - TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) { + TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache, + TbUsageStatsService statsService) { super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); + this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer(); this.stateService = stateService; this.localSubscriptionService = localSubscriptionService; this.subscriptionManagerService = subscriptionManagerService; this.tbCoreDeviceRpcService = tbCoreDeviceRpcService; this.stats = new TbCoreConsumerStats(statsFactory); + this.statsService = statsService; } @PostConstruct public void init() { super.init("tb-core-consumer", "tb-core-notifications-consumer"); + this.usageStatsExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer")); } @PreDestroy public void destroy() { super.destroy(); + if (usageStatsExecutor != null) { + usageStatsExecutor.shutdownNow(); + } + } + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationEvent(ApplicationReadyEvent event) { + super.onApplicationEvent(event); + launchUsageStatsConsumer(); } @Override @@ -223,6 +245,53 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService { + while (!stopped) { + try { + List> msgs = usageStatsConsumer.poll(getNotificationPollDuration()); + if (msgs.isEmpty()) { + continue; + } + ConcurrentMap> pendingMap = msgs.stream().collect( + Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + TbPackProcessingContext> ctx = new TbPackProcessingContext<>( + processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); + pendingMap.forEach((id, msg) -> { + log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue()); + TbCallback callback = new TbPackCallback<>(id, ctx); + try { + handleUsageStats(msg, callback); + } catch (Throwable e) { + log.warn("[{}] Failed to process usge stats: {}", id, msg, e); + callback.onFailure(e); + } + }); + if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) { + ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process usage stats: {}", id, msg.getValue())); + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process usage stats: {}", id, msg.getValue())); + } + usageStatsConsumer.commit(); + } catch (Exception e) { + if (!stopped) { + log.warn("Failed to obtain usage stats from queue.", e); + try { + Thread.sleep(getNotificationPollDuration()); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new usage stats", e2); + } + } + } + } + log.info("TB Usage Stats Consumer stopped."); + }); + } + + private void handleUsageStats(TbProtoQueueMsg msg, TbCallback callback) { + statsService.process(msg, callback); + } + private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) { RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) diff --git a/application/src/main/java/org/thingsboard/server/service/usagestats/DefaultTbUsageStatsService.java b/application/src/main/java/org/thingsboard/server/service/usagestats/DefaultTbUsageStatsService.java new file mode 100644 index 0000000000..6611507531 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/usagestats/DefaultTbUsageStatsService.java @@ -0,0 +1,15 @@ +package org.thingsboard.server.service.usagestats; + +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +@Service +public class DefaultTbUsageStatsService implements TbUsageStatsService { + @Override + public void process(TbProtoQueueMsg msg, TbCallback callback) { + + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/usagestats/TbUsageStatsService.java b/application/src/main/java/org/thingsboard/server/service/usagestats/TbUsageStatsService.java new file mode 100644 index 0000000000..35a7051896 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/usagestats/TbUsageStatsService.java @@ -0,0 +1,10 @@ +package org.thingsboard.server.service.usagestats; + +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +public interface TbUsageStatsService { + + void process(TbProtoQueueMsg msg, TbCallback callback); +}