used scheduler instead of thread sleep
This commit is contained in:
		
							parent
							
								
									7f31069285
								
							
						
					
					
						commit
						92117d1280
					
				@ -15,14 +15,12 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.entitiy;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.HasName;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
@ -50,6 +48,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
			
		||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.queue.QueueService;
 | 
			
		||||
import org.thingsboard.server.dao.rule.RuleChainService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantService;
 | 
			
		||||
@ -69,8 +68,6 @@ public abstract class AbstractTbEntityService {
 | 
			
		||||
 | 
			
		||||
    protected static final int DEFAULT_PAGE_SIZE = 1000;
 | 
			
		||||
 | 
			
		||||
    private static final ObjectMapper json = new ObjectMapper();
 | 
			
		||||
 | 
			
		||||
    @Value("${server.log_controller_error_stack_trace}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private boolean logControllerErrorStackTrace;
 | 
			
		||||
@ -106,6 +103,8 @@ public abstract class AbstractTbEntityService {
 | 
			
		||||
    protected RuleChainService ruleChainService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected EdgeNotificationService edgeNotificationService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected QueueService queueService;
 | 
			
		||||
 | 
			
		||||
    protected ListenableFuture<Void> removeAlarmsByEntityId(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        ListenableFuture<PageData<AlarmInfo>> alarmsFuture =
 | 
			
		||||
 | 
			
		||||
@ -16,7 +16,6 @@
 | 
			
		||||
package org.thingsboard.server.service.entitiy.queue;
 | 
			
		||||
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
@ -30,27 +29,30 @@ import org.thingsboard.server.common.data.queue.Queue;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.dao.device.DeviceProfileService;
 | 
			
		||||
import org.thingsboard.server.dao.queue.QueueService;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Service
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService {
 | 
			
		||||
    private static final String MAIN = "Main";
 | 
			
		||||
    private static final long DELETE_DELAY = 30;
 | 
			
		||||
 | 
			
		||||
    private final QueueService queueService;
 | 
			
		||||
    private final TbClusterService tbClusterService;
 | 
			
		||||
    private final TbQueueAdmin tbQueueAdmin;
 | 
			
		||||
    private final DeviceProfileService deviceProfileService;
 | 
			
		||||
    private final SchedulerComponent scheduler;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Queue saveQueue(Queue queue) {
 | 
			
		||||
@ -90,57 +92,50 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onQueueCreated(Queue queue) {
 | 
			
		||||
        if (tbQueueAdmin != null) {
 | 
			
		||||
            for (int i = 0; i < queue.getPartitions(); i++) {
 | 
			
		||||
                tbQueueAdmin.createTopicIfNotExists(
 | 
			
		||||
                        new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
            }
 | 
			
		||||
        for (int i = 0; i < queue.getPartitions(); i++) {
 | 
			
		||||
            tbQueueAdmin.createTopicIfNotExists(
 | 
			
		||||
                    new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (tbClusterService != null) {
 | 
			
		||||
            tbClusterService.onQueueChange(queue);
 | 
			
		||||
        }
 | 
			
		||||
        tbClusterService.onQueueChange(queue);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onQueueUpdated(Queue queue, Queue oldQueue) {
 | 
			
		||||
        int oldPartitions = oldQueue.getPartitions();
 | 
			
		||||
        int currentPartitions = queue.getPartitions();
 | 
			
		||||
 | 
			
		||||
        if (currentPartitions != oldPartitions && tbQueueAdmin != null) {
 | 
			
		||||
        if (currentPartitions != oldPartitions) {
 | 
			
		||||
            if (currentPartitions > oldPartitions) {
 | 
			
		||||
                log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName());
 | 
			
		||||
                for (int i = oldPartitions; i < currentPartitions; i++) {
 | 
			
		||||
                    tbQueueAdmin.createTopicIfNotExists(
 | 
			
		||||
                            new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
                }
 | 
			
		||||
                if (tbClusterService != null) {
 | 
			
		||||
                    tbClusterService.onQueueChange(queue);
 | 
			
		||||
                }
 | 
			
		||||
                tbClusterService.onQueueChange(queue);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName());
 | 
			
		||||
                if (tbClusterService != null) {
 | 
			
		||||
                    tbClusterService.onQueueChange(queue);
 | 
			
		||||
                }
 | 
			
		||||
                await();
 | 
			
		||||
                for (int i = currentPartitions; i < oldPartitions; i++) {
 | 
			
		||||
                    String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
 | 
			
		||||
                    log.info("Removed partition [{}]", fullTopicName);
 | 
			
		||||
                    tbQueueAdmin.deleteTopic(
 | 
			
		||||
                            fullTopicName);
 | 
			
		||||
                }
 | 
			
		||||
                tbClusterService.onQueueChange(queue);
 | 
			
		||||
 | 
			
		||||
                scheduler.schedule(() -> {
 | 
			
		||||
                    for (int i = currentPartitions; i < oldPartitions; i++) {
 | 
			
		||||
                        String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
 | 
			
		||||
                        log.info("Removed partition [{}]", fullTopicName);
 | 
			
		||||
                        tbQueueAdmin.deleteTopic(
 | 
			
		||||
                                fullTopicName);
 | 
			
		||||
                    }
 | 
			
		||||
                }, DELETE_DELAY, TimeUnit.SECONDS);
 | 
			
		||||
            }
 | 
			
		||||
        } else if (!oldQueue.equals(queue) && tbClusterService != null) {
 | 
			
		||||
        } else if (!oldQueue.equals(queue)) {
 | 
			
		||||
            tbClusterService.onQueueChange(queue);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onQueueDeleted(Queue queue) {
 | 
			
		||||
        if (tbClusterService != null) {
 | 
			
		||||
            tbClusterService.onQueueDelete(queue);
 | 
			
		||||
            await();
 | 
			
		||||
        }
 | 
			
		||||
        tbClusterService.onQueueDelete(queue);
 | 
			
		||||
 | 
			
		||||
//        queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
 | 
			
		||||
        if (tbQueueAdmin != null) {
 | 
			
		||||
 | 
			
		||||
        scheduler.schedule(() -> {
 | 
			
		||||
            for (int i = 0; i < queue.getPartitions(); i++) {
 | 
			
		||||
                String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
 | 
			
		||||
                log.info("Deleting queue [{}]", fullTopicName);
 | 
			
		||||
@ -150,16 +145,12 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
                    log.error("Failed to delete queue [{}]", fullTopicName);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SneakyThrows
 | 
			
		||||
    private void await() {
 | 
			
		||||
        Thread.sleep(3000);
 | 
			
		||||
        }, DELETE_DELAY, TimeUnit.SECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateQueuesByTenants(List<TenantId> tenantIds, TenantProfile newTenantProfile, TenantProfile oldTenantProfile) {
 | 
			
		||||
    public void updateQueuesByTenants(List<TenantId> tenantIds, TenantProfile newTenantProfile, TenantProfile
 | 
			
		||||
            oldTenantProfile) {
 | 
			
		||||
        boolean oldIsolated = oldTenantProfile != null && oldTenantProfile.isIsolatedTbRuleEngine();
 | 
			
		||||
        boolean newIsolated = newTenantProfile.isIsolatedTbRuleEngine();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,7 @@ import java.util.List;
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
public class DefaultTbTenantProfileService implements TbTenantProfileService {
 | 
			
		||||
    private final TbQueueService queueService;
 | 
			
		||||
    private final TbQueueService tbQueueService;
 | 
			
		||||
    private final TenantProfileService tenantProfileService;
 | 
			
		||||
    private final TenantService tenantService;
 | 
			
		||||
 | 
			
		||||
@ -42,7 +42,7 @@ public class DefaultTbTenantProfileService implements TbTenantProfileService {
 | 
			
		||||
 | 
			
		||||
        if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) {
 | 
			
		||||
            List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId());
 | 
			
		||||
            queueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
 | 
			
		||||
            tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return savedTenantProfile;
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,6 @@ import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
			
		||||
import org.thingsboard.server.dao.queue.QueueService;
 | 
			
		||||
import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
import org.thingsboard.server.dao.service.PaginatedRemover;
 | 
			
		||||
import org.thingsboard.server.dao.service.Validator;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user