diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0cae3252ff..0b2c40acef 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -566,6 +566,12 @@ queue: automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" + queue-properties: + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java index c9ba3ec15f..351cd4058c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java @@ -30,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -47,8 +48,10 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueCoreSettings coreSettings; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueAdmin coreAdmin; private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin notificationAdmin; @@ -56,12 +59,15 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { TbQueueTransportNotificationSettings transportNotificationSettings, TbAwsSqsSettings sqsSettings, TbServiceInfoProvider serviceInfoProvider, + TbQueueCoreSettings coreSettings, TbAwsSqsQueueAttributes sqsQueueAttributes) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.sqsSettings = sqsSettings; this.serviceInfoProvider = serviceInfoProvider; + this.coreSettings = coreSettings; + this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); } @@ -94,7 +100,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic()); + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); } @Override @@ -105,6 +111,9 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { @PreDestroy private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } if (transportApiAdmin != null) { transportApiAdmin.destroy(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index c03f5e52d8..2855e577ee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -32,9 +32,9 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; -import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @Component @ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @@ -43,13 +43,16 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueCoreSettings coreSettings; public InMemoryTbTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, + TbQueueCoreSettings coreSettings) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceInfoProvider = serviceInfoProvider; + this.coreSettings = coreSettings; } @Override @@ -86,7 +89,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic()); + return new InMemoryTbQueueProducer<>(coreSettings.getTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index ff4a69e2e6..72f45e5276 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -28,8 +28,10 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; @@ -37,6 +39,8 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'") public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { @@ -48,7 +52,12 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbRabbitMqSettings rabbitMqSettings; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -56,7 +65,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbRabbitMqSettings rabbitMqSettings, - TbQueueAdmin admin) { + TbRabbitMqQueueArguments queueArguments) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -64,73 +73,97 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.rabbitMqSettings = rabbitMqSettings; - this.admin = admin; + + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic()); + return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(), + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(), + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), + return new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getResponsesTopic()); + return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getResponsesTopic()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index 5708e0738c..d84bbedbd2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -34,13 +34,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { @@ -51,7 +55,12 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, TbQueueCoreSettings coreSettings, @@ -59,67 +68,91 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, - TbQueueAdmin admin) { + TbRabbitMqQueueArguments queueArguments) { this.rabbitMqSettings = rabbitMqSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; - this.admin = admin; + + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(), + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), + return new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java index e2755938d9..a18f427fab 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java @@ -32,13 +32,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -48,55 +52,63 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbRabbitMqSettings rabbitMqSettings; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin notificationAdmin; public RabbitMqTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbRabbitMqSettings rabbitMqSettings, - TbQueueAdmin admin) { + TbRabbitMqQueueArguments queueArguments) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.rabbitMqSettings = rabbitMqSettings; - this.admin = admin; + + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(), + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @@ -105,4 +117,20 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java index dc6154255c..841e004b3f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTransportQueueFactory.java @@ -30,12 +30,17 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @Slf4j @@ -43,34 +48,45 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbRabbitMqSettings rabbitMqSettings; - private final TbQueueAdmin admin; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueCoreSettings coreSettings; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public RabbitMqTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbRabbitMqSettings rabbitMqSettings, TbServiceInfoProvider serviceInfoProvider, - TbQueueAdmin admin) { + TbQueueCoreSettings coreSettings, + TbRabbitMqQueueArguments queueArguments) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.rabbitMqSettings = rabbitMqSettings; - this.admin = admin; this.serviceInfoProvider = serviceInfoProvider; + this.coreSettings = coreSettings; + + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { TbRabbitMqProducerTemplate> producerTemplate = - new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); + new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); TbRabbitMqConsumerTemplate> consumerTemplate = - new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); - templateBuilder.queueAdmin(admin); + templateBuilder.queueAdmin(transportApiAdmin); templateBuilder.requestTemplate(producerTemplate); templateBuilder.responseTemplate(consumerTemplate); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); @@ -81,17 +97,33 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); + return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createTransportNotificationsConsumer() { - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java index d38a4389a3..33c5277ca5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java @@ -29,6 +29,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; @@ -41,17 +42,20 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory private final TbServiceBusSettings serviceBusSettings; private final TbQueueAdmin admin; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueCoreSettings coreSettings; public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceBusSettings serviceBusSettings, TbServiceInfoProvider serviceInfoProvider, + TbQueueCoreSettings coreSettings, TbQueueAdmin admin) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; this.admin = admin; this.serviceInfoProvider = serviceInfoProvider; + this.coreSettings = coreSettings; } @Override @@ -82,7 +86,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java index 9a7e5db342..3bef6deb84 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java @@ -18,24 +18,23 @@ package org.thingsboard.server.queue.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.stereotype.Component; import org.thingsboard.server.queue.TbQueueAdmin; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeoutException; @Slf4j -@Component -@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") public class TbRabbitMqAdmin implements TbQueueAdmin { private final TbRabbitMqSettings rabbitMqSettings; private final Channel channel; private final Connection connection; + private final Map arguments; - public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings) { + public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map arguments) { this.rabbitMqSettings = rabbitMqSettings; + this.arguments = arguments; try { connection = rabbitMqSettings.getConnectionFactory().newConnection(); @@ -55,7 +54,7 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { @Override public void createTopicIfNotExists(String topic) { try { - channel.queueDeclare(topic, false, false, false, null); + channel.queueDeclare(topic, false, false, false, arguments); } catch (IOException e) { log.error("Failed to bind queue: [{}]", topic, e); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java new file mode 100644 index 0000000000..eb73e7dce6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java @@ -0,0 +1,98 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.rabbitmq; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") +public class TbRabbitMqQueueArguments { + @Value("${queue.rabbitmq.queue-properties.core}") + private String coreProperties; + @Value("${queue.rabbitmq.queue-properties.rule-engine}") + private String ruleEngineProperties; + @Value("${queue.rabbitmq.queue-properties.transport-api}") + private String transportApiProperties; + @Value("${queue.rabbitmq.queue-properties.notifications}") + private String notificationsProperties; + @Value("${queue.rabbitmq.queue-properties.js-executor}") + private String jsExecutorProperties; + + @Getter + private Map coreArgs; + @Getter + private Map ruleEngineArgs; + @Getter + private Map transportApiArgs; + @Getter + private Map notificationsArgs; + @Getter + private Map jsExecutorArgs; + + @PostConstruct + private void init() { + coreArgs = getArgs(coreProperties); + ruleEngineArgs = getArgs(ruleEngineProperties); + transportApiArgs = getArgs(transportApiProperties); + notificationsArgs = getArgs(notificationsProperties); + jsExecutorArgs = getArgs(jsExecutorProperties); + } + + private Map getArgs(String properties) { + Map configs = new HashMap<>(); + for (String property : properties.split(";")) { + int delimiterPosition = property.indexOf(":"); + String key = property.substring(0, delimiterPosition); + String strValue = property.substring(delimiterPosition + 1); + configs.put(key, getObjectValue(strValue)); + } + return configs; + } + + private Object getObjectValue(String str) { + if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) { + return Boolean.valueOf(str); + } else if (isNumeric(str)) { + return getNumericValue(str); + } + return str; + } + + private Object getNumericValue(String str) { + if (str.contains(".")) { + return Double.valueOf(str); + } else { + return Long.valueOf(str); + } + } + + private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?"); + + public boolean isNumeric(String strNum) { + if (strNum == null) { + return false; + } + return PATTERN.matcher(strNum).matches(); + } +} diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index c3e646c77c..c3dcb76508 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -95,6 +95,12 @@ queue: automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" + queue-properties: + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 0d48bda2b9..baac6e977f 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -96,6 +96,12 @@ queue: automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" + queue-properties: + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 4249757db1..e920891d66 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -126,6 +126,12 @@ queue: automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" + queue-properties: + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"