diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 6e8e8c20e4..c22ced2387 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -16,6 +16,10 @@ package org.thingsboard.server.service.apiusage; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.checkerframework.checker.nullness.qual.Nullable; @@ -23,9 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.MailService; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; @@ -49,8 +53,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfigurat import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.tools.SchedulerUtils; -import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -58,11 +62,11 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.scheduler.SchedulerComponent; -import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.telemetry.InternalTelemetryService; import javax.annotation.PostConstruct; @@ -73,13 +77,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -102,23 +108,25 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener myUsageStates = new ConcurrentHashMap<>(); + final Map myUsageStates = new ConcurrentHashMap<>(); // Entities that should be processed on other servers - private final Map otherUsageStates = new ConcurrentHashMap<>(); + final Map otherUsageStates = new ConcurrentHashMap<>(); - private final Set deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final ConcurrentMap> partitionedEntities = new ConcurrentHashMap<>(); + + final Set deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); @Value("${usage.stats.report.enabled:true}") private boolean enabled; @@ -130,25 +138,29 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener> subscribeQueue = new ConcurrentLinkedQueue<>(); + public DefaultTbApiUsageStateService(TbClusterService clusterService, PartitionService partitionService, TenantService tenantService, - CustomerService customerService, TimeseriesService tsService, ApiUsageStateService apiUsageStateService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache, - MailService mailService) { + MailService mailService, + DbCallbackExecutorService dbExecutor) { this.clusterService = clusterService; this.partitionService = partitionService; this.tenantService = tenantService; - this.customerService = customerService; this.tsService = tsService; this.apiUsageStateService = apiUsageStateService; this.scheduler = scheduler; this.tenantProfileCache = tenantProfileCache; this.mailService = mailService; this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail")); + this.dbExecutor = dbExecutor; } @PostConstruct @@ -158,6 +170,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { - return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); + subscribeQueue.add(partitionChangeEvent.getPartitions()); + tenantStateExecutor.submit(this::pollInitStateFromDB); + } + } + + void pollInitStateFromDB() { + final Set partitions = getLatestPartitionsFromQueue(); + if (partitions == null) { + log.info("Tenant state service. Nothing to do. partitions is null"); + return; + } + initStateFromDB(partitions); + } + + Set getLatestPartitionsFromQueue() { + log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size()); + Set partitions = null; + while (!subscribeQueue.isEmpty()) { + partitions = subscribeQueue.poll(); + log.debug("polled from the queue partitions {}", partitions); + } + log.debug("getLatestPartitionsFromQueue, partitions {}", partitions); + return partitions; + } + + private void initStateFromDB(Set partitions) { + try { + Set addedPartitions = new HashSet<>(partitions); + addedPartitions.removeAll(partitionedEntities.keySet()); + + Set removedPartitions = new HashSet<>(partitionedEntities.keySet()); + removedPartitions.removeAll(partitions); + + removedPartitions.forEach(partition -> { + Set entities = partitionedEntities.remove(partition); + entities.forEach(this::cleanUpEntitiesStateMap); }); - otherUsageStates.entrySet().removeIf(entry -> { - return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); - }); - initStatesFromDataBase(); + + addedPartitions.forEach(tpi -> + partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); + + otherUsageStates.entrySet().removeIf(entry -> + partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition()); + + initStatesFromDataBase(addedPartitions); + } catch (Throwable t) { + log.warn("Failed to init tenant states from DB", t); } } @@ -311,6 +364,18 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener entityIds = partitionedEntities.get(tpi); + if (entityIds != null) { + entityIds.add(entityId); + myUsageStates.put(entityId, state); + } else { + log.debug("[{}] belongs to external partition {}", entityId, tpi.getFullTopicName()); + throw new RuntimeException(entityId.getEntityType() + " belongs to external partition " + tpi.getFullTopicName() + "!"); + } + } + private void updateProfileThresholds(TenantId tenantId, ApiUsageStateId id, TenantProfileConfiguration oldData, TenantProfileConfiguration newData) { long ts = System.currentTimeMillis(); @@ -339,6 +404,10 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener result) { log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result); apiUsageStateService.update(state.getApiUsageState()); @@ -420,7 +489,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener addedPartitions) { + if (addedPartitions.isEmpty()) { + return; + } + try { log.info("Initializing tenant states."); updateLock.lock(); try { - ExecutorService tmpInitExecutor = ThingsBoardExecutors.newWorkStealingPool(20, "init-tenant-states-from-db"); - try { - PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); - List> futures = new ArrayList<>(); - for (Tenant tenant : tenantIterator) { - if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { + PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); + List> futures = new ArrayList<>(); + for (Tenant tenant : tenantIterator) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()); + if (addedPartitions.contains(tpi)) { + if (!myUsageStates.containsKey(tenant.getId()) && tpi.isMyPartition()) { log.debug("[{}] Initializing tenant state.", tenant.getId()); - futures.add(tmpInitExecutor.submit(() -> { + futures.add(dbExecutor.submit(() -> { try { updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); log.debug("[{}] Initialized tenant state.", tenant.getId()); } catch (Exception e) { log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); } + return null; })); } + } else { + log.debug("[{}][{}] Tenant doesn't belong to current partition. tpi [{}]", tenant.getName(), tenant.getId(), tpi); } - for (Future future : futures) { - future.get(); - } - } finally { - tmpInitExecutor.shutdownNow(); } + Futures.whenAllComplete(futures); } finally { updateLock.unlock(); } - log.info("Initialized tenant states."); + log.info("Initialized {} tenant states.", myUsageStates.size()); } catch (Exception e) { log.warn("Unknown failure", e); } @@ -524,5 +601,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { if (!myPartitions.containsKey(serviceQueueKey)) { log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey); @@ -174,7 +176,6 @@ public class HashPartitionService implements PartitionService { applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList)); } }); - tpiCache.clear(); if (currentOtherServices == null) { currentOtherServices = new ArrayList<>(otherServices);