handling of PartitionChangeEvent is not more synchronous
This commit is contained in:
		
							parent
							
								
									1097ce2cb1
								
							
						
					
					
						commit
						073ce69872
					
				@ -16,6 +16,10 @@
 | 
			
		||||
package org.thingsboard.server.service.apiusage;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.checkerframework.checker.nullness.qual.Nullable;
 | 
			
		||||
@ -23,9 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiFeature;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageState;
 | 
			
		||||
@ -49,8 +53,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfigurat
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
 | 
			
		||||
import org.thingsboard.server.dao.customer.CustomerService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
@ -58,11 +62,11 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.InternalTelemetryService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
@ -73,13 +77,15 @@ import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Queue;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.locks.Lock;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
@ -102,12 +108,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
    private final TbClusterService clusterService;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TenantService tenantService;
 | 
			
		||||
    private final CustomerService customerService;
 | 
			
		||||
    private final TimeseriesService tsService;
 | 
			
		||||
    private final ApiUsageStateService apiUsageStateService;
 | 
			
		||||
    private final SchedulerComponent scheduler;
 | 
			
		||||
    private final TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
    private final MailService mailService;
 | 
			
		||||
    private final DbCallbackExecutorService dbExecutor;
 | 
			
		||||
 | 
			
		||||
    @Lazy
 | 
			
		||||
    @Autowired
 | 
			
		||||
@ -118,6 +124,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
    // Entities that should be processed on other servers
 | 
			
		||||
    private final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<TopicPartitionInfo, Set<EntityId>> partitionedEntities = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
 | 
			
		||||
 | 
			
		||||
    @Value("${usage.stats.report.enabled:true}")
 | 
			
		||||
@ -130,25 +138,29 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
 | 
			
		||||
    private final ExecutorService mailExecutor;
 | 
			
		||||
 | 
			
		||||
    private ListeningScheduledExecutorService tenantStateExecutor;
 | 
			
		||||
 | 
			
		||||
    final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
 | 
			
		||||
 | 
			
		||||
    public DefaultTbApiUsageStateService(TbClusterService clusterService,
 | 
			
		||||
                                         PartitionService partitionService,
 | 
			
		||||
                                         TenantService tenantService,
 | 
			
		||||
                                         CustomerService customerService,
 | 
			
		||||
                                         TimeseriesService tsService,
 | 
			
		||||
                                         ApiUsageStateService apiUsageStateService,
 | 
			
		||||
                                         SchedulerComponent scheduler,
 | 
			
		||||
                                         TbTenantProfileCache tenantProfileCache,
 | 
			
		||||
                                         MailService mailService) {
 | 
			
		||||
                                         MailService mailService,
 | 
			
		||||
                                         DbCallbackExecutorService dbExecutor) {
 | 
			
		||||
        this.clusterService = clusterService;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.tenantService = tenantService;
 | 
			
		||||
        this.customerService = customerService;
 | 
			
		||||
        this.tsService = tsService;
 | 
			
		||||
        this.apiUsageStateService = apiUsageStateService;
 | 
			
		||||
        this.scheduler = scheduler;
 | 
			
		||||
        this.tenantProfileCache = tenantProfileCache;
 | 
			
		||||
        this.mailService = mailService;
 | 
			
		||||
        this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail"));
 | 
			
		||||
        this.dbExecutor = dbExecutor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
@ -158,6 +170,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
            scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
 | 
			
		||||
            log.info("Started api usage service.");
 | 
			
		||||
        }
 | 
			
		||||
        tenantStateExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tenant-state")));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -219,13 +232,53 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
 | 
			
		||||
        if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
 | 
			
		||||
            myUsageStates.entrySet().removeIf(entry -> {
 | 
			
		||||
                return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition();
 | 
			
		||||
            subscribeQueue.add(partitionChangeEvent.getPartitions());
 | 
			
		||||
            tenantStateExecutor.submit(this::pollInitStateFromDB);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void pollInitStateFromDB() {
 | 
			
		||||
        final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue();
 | 
			
		||||
        if (partitions == null) {
 | 
			
		||||
            log.info("Tenant state service. Nothing to do. partitions is null");
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        initStateFromDB(partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Set<TopicPartitionInfo> getLatestPartitionsFromQueue() {
 | 
			
		||||
        log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size());
 | 
			
		||||
        Set<TopicPartitionInfo> partitions = null;
 | 
			
		||||
        while (!subscribeQueue.isEmpty()) {
 | 
			
		||||
            partitions = subscribeQueue.poll();
 | 
			
		||||
            log.debug("polled from the queue partitions {}", partitions);
 | 
			
		||||
        }
 | 
			
		||||
        log.debug("getLatestPartitionsFromQueue, partitions {}", partitions);
 | 
			
		||||
        return partitions;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initStateFromDB(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        try {
 | 
			
		||||
            Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
 | 
			
		||||
            addedPartitions.removeAll(partitionedEntities.keySet());
 | 
			
		||||
 | 
			
		||||
            Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedEntities.keySet());
 | 
			
		||||
            removedPartitions.removeAll(partitions);
 | 
			
		||||
 | 
			
		||||
            removedPartitions.forEach(partition -> {
 | 
			
		||||
                Set<EntityId> entities = partitionedEntities.remove(partition);
 | 
			
		||||
                entities.forEach(this::cleanUpEntitiesStateMap);
 | 
			
		||||
            });
 | 
			
		||||
            otherUsageStates.entrySet().removeIf(entry -> {
 | 
			
		||||
                return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition();
 | 
			
		||||
            });
 | 
			
		||||
            initStatesFromDataBase();
 | 
			
		||||
 | 
			
		||||
            addedPartitions.forEach(tpi ->
 | 
			
		||||
                    partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
 | 
			
		||||
 | 
			
		||||
            otherUsageStates.entrySet().removeIf(entry ->
 | 
			
		||||
                    partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition());
 | 
			
		||||
 | 
			
		||||
            initStatesFromDataBase(addedPartitions);
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.warn("Failed to init tenant states from DB", t);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -311,6 +364,18 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
                oldProfileData.getConfiguration(), profile.getProfileData().getConfiguration());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addEntityState(TopicPartitionInfo tpi, BaseApiUsageState state) {
 | 
			
		||||
        EntityId entityId = state.getEntityId();
 | 
			
		||||
        Set<EntityId> entityIds = partitionedEntities.get(tpi);
 | 
			
		||||
        if (entityIds != null) {
 | 
			
		||||
            entityIds.add(entityId);
 | 
			
		||||
            myUsageStates.put(entityId, state);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.debug("[{}] belongs to external partition {}", entityId, tpi.getFullTopicName());
 | 
			
		||||
            throw new RuntimeException(entityId.getEntityType() + " belongs to external partition " + tpi.getFullTopicName() + "!");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void updateProfileThresholds(TenantId tenantId, ApiUsageStateId id,
 | 
			
		||||
                                         TenantProfileConfiguration oldData, TenantProfileConfiguration newData) {
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
@ -339,6 +404,10 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
        myUsageStates.remove(customerId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void cleanUpEntitiesStateMap(EntityId entityId) {
 | 
			
		||||
        myUsageStates.remove(entityId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void persistAndNotify(BaseApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) {
 | 
			
		||||
        log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result);
 | 
			
		||||
        apiUsageStateService.update(state.getApiUsageState());
 | 
			
		||||
@ -473,7 +542,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            log.debug("[{}] Initialized state: {}", entityId, storedState);
 | 
			
		||||
            myUsageStates.put(entityId, state);
 | 
			
		||||
            TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
 | 
			
		||||
            if (tpi.isMyPartition()) {
 | 
			
		||||
                addEntityState(tpi, state);
 | 
			
		||||
            } else {
 | 
			
		||||
                otherUsageStates.put(entityId, state.getApiUsageState());
 | 
			
		||||
            }
 | 
			
		||||
            saveNewCounts(state, newCounts);
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e);
 | 
			
		||||
@ -482,38 +556,41 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
        return state;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initStatesFromDataBase() {
 | 
			
		||||
    private void initStatesFromDataBase(Set<TopicPartitionInfo> addedPartitions) {
 | 
			
		||||
        if (addedPartitions.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            log.info("Initializing tenant states.");
 | 
			
		||||
            updateLock.lock();
 | 
			
		||||
            try {
 | 
			
		||||
                ExecutorService tmpInitExecutor = ThingsBoardExecutors.newWorkStealingPool(20, "init-tenant-states-from-db");
 | 
			
		||||
                try {
 | 
			
		||||
                    PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
 | 
			
		||||
                    List<Future<?>> futures = new ArrayList<>();
 | 
			
		||||
                    for (Tenant tenant : tenantIterator) {
 | 
			
		||||
                        if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) {
 | 
			
		||||
                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
 | 
			
		||||
                List<ListenableFuture<?>> futures = new ArrayList<>();
 | 
			
		||||
                for (Tenant tenant : tenantIterator) {
 | 
			
		||||
                    TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId());
 | 
			
		||||
                    if (addedPartitions.contains(tpi)) {
 | 
			
		||||
                        if (!myUsageStates.containsKey(tenant.getId()) && tpi.isMyPartition()) {
 | 
			
		||||
                            log.debug("[{}] Initializing tenant state.", tenant.getId());
 | 
			
		||||
                            futures.add(tmpInitExecutor.submit(() -> {
 | 
			
		||||
                            futures.add(dbExecutor.submit(() -> {
 | 
			
		||||
                                try {
 | 
			
		||||
                                    updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId()));
 | 
			
		||||
                                    log.debug("[{}] Initialized tenant state.", tenant.getId());
 | 
			
		||||
                                } catch (Exception e) {
 | 
			
		||||
                                    log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e);
 | 
			
		||||
                                }
 | 
			
		||||
                                return null;
 | 
			
		||||
                            }));
 | 
			
		||||
                        }
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.debug("[{}][{}] Tenant doesn't belong to current partition. tpi [{}]", tenant.getName(), tenant.getId(), tpi);
 | 
			
		||||
                    }
 | 
			
		||||
                    for (Future<?> future : futures) {
 | 
			
		||||
                        future.get();
 | 
			
		||||
                    }
 | 
			
		||||
                } finally {
 | 
			
		||||
                    tmpInitExecutor.shutdownNow();
 | 
			
		||||
                }
 | 
			
		||||
                Futures.whenAllComplete(futures);
 | 
			
		||||
            } finally {
 | 
			
		||||
                updateLock.unlock();
 | 
			
		||||
            }
 | 
			
		||||
            log.info("Initialized tenant states.");
 | 
			
		||||
            log.info("Initialized {} tenant states.", myUsageStates.size());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("Unknown failure", e);
 | 
			
		||||
        }
 | 
			
		||||
@ -524,5 +601,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
 | 
			
		||||
        if (mailExecutor != null) {
 | 
			
		||||
            mailExecutor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
        if (tenantStateExecutor != null) {
 | 
			
		||||
            tenantStateExecutor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -158,6 +158,8 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        tpiCache.clear();
 | 
			
		||||
 | 
			
		||||
        oldPartitions.forEach((serviceQueueKey, partitions) -> {
 | 
			
		||||
            if (!myPartitions.containsKey(serviceQueueKey)) {
 | 
			
		||||
                log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
 | 
			
		||||
@ -174,7 +176,6 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
                applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        tpiCache.clear();
 | 
			
		||||
 | 
			
		||||
        if (currentOtherServices == null) {
 | 
			
		||||
            currentOtherServices = new ArrayList<>(otherServices);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user