Correct executor for the service

This commit is contained in:
Andrii Shvaika 2022-02-18 12:44:22 +02:00
parent a7da56c270
commit b33d28eedf

View File

@ -110,7 +110,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
private final TenantService tenantService; private final TenantService tenantService;
private final TimeseriesService tsService; private final TimeseriesService tsService;
private final ApiUsageStateService apiUsageStateService; private final ApiUsageStateService apiUsageStateService;
private final SchedulerComponent scheduler;
private final TbTenantProfileCache tenantProfileCache; private final TbTenantProfileCache tenantProfileCache;
private final MailService mailService; private final MailService mailService;
private final DbCallbackExecutorService dbExecutor; private final DbCallbackExecutorService dbExecutor;
@ -138,7 +137,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
private final ExecutorService mailExecutor; private final ExecutorService mailExecutor;
private ListeningScheduledExecutorService tenantStateExecutor; private ListeningScheduledExecutorService scheduledExecutor;
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>(); final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
@ -147,7 +146,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
TenantService tenantService, TenantService tenantService,
TimeseriesService tsService, TimeseriesService tsService,
ApiUsageStateService apiUsageStateService, ApiUsageStateService apiUsageStateService,
SchedulerComponent scheduler,
TbTenantProfileCache tenantProfileCache, TbTenantProfileCache tenantProfileCache,
MailService mailService, MailService mailService,
DbCallbackExecutorService dbExecutor) { DbCallbackExecutorService dbExecutor) {
@ -156,7 +154,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
this.tenantService = tenantService; this.tenantService = tenantService;
this.tsService = tsService; this.tsService = tsService;
this.apiUsageStateService = apiUsageStateService; this.apiUsageStateService = apiUsageStateService;
this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache; this.tenantProfileCache = tenantProfileCache;
this.mailService = mailService; this.mailService = mailService;
this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail")); this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail"));
@ -165,12 +162,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
@PostConstruct @PostConstruct
public void init() { public void init() {
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("api-usage-scheduled")));
if (enabled) { if (enabled) {
log.info("Starting api usage service."); log.info("Starting api usage service.");
scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS); scheduledExecutor.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
log.info("Started api usage service."); log.info("Started api usage service.");
} }
tenantStateExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tenant-state")));
} }
@Override @Override
@ -233,14 +230,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
subscribeQueue.add(partitionChangeEvent.getPartitions()); subscribeQueue.add(partitionChangeEvent.getPartitions());
tenantStateExecutor.submit(this::pollInitStateFromDB); scheduledExecutor.submit(this::pollInitStateFromDB);
} }
} }
void pollInitStateFromDB() { void pollInitStateFromDB() {
final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue(); final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue();
if (partitions == null) { if (partitions == null) {
log.info("Tenant state service. Nothing to do. partitions is null"); log.info("Api Usage state service. Nothing to do. Partitions are empty");
return; return;
} }
initStateFromDB(partitions); initStateFromDB(partitions);
@ -601,8 +598,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
if (mailExecutor != null) { if (mailExecutor != null) {
mailExecutor.shutdownNow(); mailExecutor.shutdownNow();
} }
if (tenantStateExecutor != null) { if (scheduledExecutor != null) {
tenantStateExecutor.shutdownNow(); scheduledExecutor.shutdownNow();
} }
} }
} }