added loging and refactored
This commit is contained in:
		
							parent
							
								
									a0a0d5dcf9
								
							
						
					
					
						commit
						7f31069285
					
				@ -17,16 +17,6 @@
 | 
			
		||||
ALTER TABLE device_profile
 | 
			
		||||
    ADD COLUMN IF NOT EXISTS default_queue_id uuid;
 | 
			
		||||
 | 
			
		||||
DO
 | 
			
		||||
$$
 | 
			
		||||
    BEGIN
 | 
			
		||||
        IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN
 | 
			
		||||
            ALTER TABLE device_profile
 | 
			
		||||
                ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id);
 | 
			
		||||
        END IF;
 | 
			
		||||
    END;
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
DO
 | 
			
		||||
$$
 | 
			
		||||
    BEGIN
 | 
			
		||||
@ -45,5 +35,15 @@ $$
 | 
			
		||||
    END
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
DO
 | 
			
		||||
$$
 | 
			
		||||
    BEGIN
 | 
			
		||||
        IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN
 | 
			
		||||
            ALTER TABLE device_profile
 | 
			
		||||
                ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id);
 | 
			
		||||
        END IF;
 | 
			
		||||
    END;
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
ALTER TABLE device_profile
 | 
			
		||||
    DROP COLUMN IF EXISTS default_queue_name;
 | 
			
		||||
 | 
			
		||||
@ -79,14 +79,14 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
    public void deleteQueue(TenantId tenantId, QueueId queueId) {
 | 
			
		||||
        Queue queue = queueService.findQueueById(tenantId, queueId);
 | 
			
		||||
        queueService.deleteQueue(tenantId, queueId);
 | 
			
		||||
        onQueueDeleted(tenantId, queue);
 | 
			
		||||
        onQueueDeleted(queue);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteQueueByQueueName(TenantId tenantId, String queueName) {
 | 
			
		||||
        Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName);
 | 
			
		||||
        queueService.deleteQueue(tenantId, queue.getId());
 | 
			
		||||
        onQueueDeleted(tenantId, queue);
 | 
			
		||||
        onQueueDeleted(queue);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onQueueCreated(Queue queue) {
 | 
			
		||||
@ -123,8 +123,10 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
                }
 | 
			
		||||
                await();
 | 
			
		||||
                for (int i = currentPartitions; i < oldPartitions; i++) {
 | 
			
		||||
                    String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
 | 
			
		||||
                    log.info("Removed partition [{}]", fullTopicName);
 | 
			
		||||
                    tbQueueAdmin.deleteTopic(
 | 
			
		||||
                            new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
                            fullTopicName);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else if (!oldQueue.equals(queue) && tbClusterService != null) {
 | 
			
		||||
@ -132,7 +134,7 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onQueueDeleted(TenantId tenantId, Queue queue) {
 | 
			
		||||
    private void onQueueDeleted(Queue queue) {
 | 
			
		||||
        if (tbClusterService != null) {
 | 
			
		||||
            tbClusterService.onQueueDelete(queue);
 | 
			
		||||
            await();
 | 
			
		||||
@ -141,7 +143,7 @@ public class DefaultTbQueueService implements TbQueueService {
 | 
			
		||||
        if (tbQueueAdmin != null) {
 | 
			
		||||
            for (int i = 0; i < queue.getPartitions(); i++) {
 | 
			
		||||
                String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
 | 
			
		||||
                log.debug("Deleting queue [{}]", fullTopicName);
 | 
			
		||||
                log.info("Deleting queue [{}]", fullTopicName);
 | 
			
		||||
                try {
 | 
			
		||||
                    tbQueueAdmin.deleteTopic(fullTopicName);
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
 | 
			
		||||
@ -133,7 +133,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
        this.statsFactory = statsFactory;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.queueService = queueService;
 | 
			
		||||
//        this.tenantId = actorContext.getServiceInfoProvider().getIsolatedTenant().orElse(TenantId.SYS_TENANT_ID);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
@ -406,6 +405,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private synchronized void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
 | 
			
		||||
        log.info("Received queue update msg: [{}]", queueUpdateMsg);
 | 
			
		||||
        String queueName = queueUpdateMsg.getQueueName();
 | 
			
		||||
        TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
 | 
			
		||||
        QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
 | 
			
		||||
@ -439,6 +439,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void deleteQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
 | 
			
		||||
        log.info("Received queue delete msg: [{}]", queueDeleteMsg);
 | 
			
		||||
        TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
 | 
			
		||||
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -24,4 +24,4 @@ public class ProcessingStrategy {
 | 
			
		||||
    private double failurePercentage;
 | 
			
		||||
    private long pauseBetweenRetries;
 | 
			
		||||
    private long maxPauseBetweenRetries;
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.settings;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Deprecated
 | 
			
		||||
public class TbRuleEngineQueueAckStrategyConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String type;
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.settings;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Deprecated
 | 
			
		||||
public class TbRuleEngineQueueConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String name;
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.settings;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Deprecated
 | 
			
		||||
public class TbRuleEngineQueueSubmitStrategyConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String type;
 | 
			
		||||
 | 
			
		||||
@ -18,12 +18,8 @@ package org.thingsboard.server.mqtt;
 | 
			
		||||
import org.springframework.boot.SpringApplication;
 | 
			
		||||
import org.springframework.boot.SpringBootConfiguration;
 | 
			
		||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 | 
			
		||||
import org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration;
 | 
			
		||||
import org.springframework.boot.autoconfigure.data.cassandra.CassandraDataAutoConfiguration;
 | 
			
		||||
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
 | 
			
		||||
import org.springframework.context.annotation.ComponentScan;
 | 
			
		||||
import org.springframework.scheduling.annotation.EnableAsync;
 | 
			
		||||
import org.springframework.scheduling.annotation.EnableScheduling;
 | 
			
		||||
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user