Usage records

This commit is contained in:
Andrii Shvaika 2020-10-19 18:25:34 +03:00
parent 24f5872adf
commit 5eb91c17f6
3 changed files with 99 additions and 5 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,20 +17,21 @@ package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
@ -61,6 +62,7 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import org.thingsboard.server.service.usagestats.TbUsageStatsService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -70,6 +72,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -88,32 +92,50 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> mainConsumer;
private final DeviceStateService stateService;
private final TbUsageStatsService statsService;
private final TbLocalSubscriptionService localSubscriptionService;
private final SubscriptionManagerService subscriptionManagerService;
private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
private final TbCoreConsumerStats stats;
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
protected volatile ExecutorService usageStatsExecutor;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) {
TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache,
TbUsageStatsService statsService) {
super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
this.stateService = stateService;
this.localSubscriptionService = localSubscriptionService;
this.subscriptionManagerService = subscriptionManagerService;
this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
this.stats = new TbCoreConsumerStats(statsFactory);
this.statsService = statsService;
}
@PostConstruct
public void init() {
super.init("tb-core-consumer", "tb-core-notifications-consumer");
this.usageStatsExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
}
@PreDestroy
public void destroy() {
super.destroy();
if (usageStatsExecutor != null) {
usageStatsExecutor.shutdownNow();
}
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent event) {
super.onApplicationEvent(event);
launchUsageStatsConsumer();
}
@Override
@ -223,6 +245,53 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void launchUsageStatsConsumer() {
usageStatsExecutor.submit(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgs = usageStatsConsumer.poll(getNotificationPollDuration());
if (msgs.isEmpty()) {
continue;
}
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
pendingMap.forEach((id, msg) -> {
log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
handleUsageStats(msg, callback);
} catch (Throwable e) {
log.warn("[{}] Failed to process usge stats: {}", id, msg, e);
callback.onFailure(e);
}
});
if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process usage stats: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process usage stats: {}", id, msg.getValue()));
}
usageStatsConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain usage stats from queue.", e);
try {
Thread.sleep(getNotificationPollDuration());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new usage stats", e2);
}
}
}
}
log.info("TB Usage Stats Consumer stopped.");
});
}
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
statsService.process(msg, callback);
}
private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())

View File

@ -0,0 +1,15 @@
package org.thingsboard.server.service.usagestats;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@Service
public class DefaultTbUsageStatsService implements TbUsageStatsService {
@Override
public void process(TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg> msg, TbCallback callback) {
}
}

View File

@ -0,0 +1,10 @@
package org.thingsboard.server.service.usagestats;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface TbUsageStatsService {
void process(TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg> msg, TbCallback callback);
}