From 1b9df18c4592ea16d35b0bf0eace8e1f20a2ca43 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 17 Apr 2020 20:30:56 +0300 Subject: [PATCH] created TbPubSubSubscriptionSettings --- .../src/main/resources/thingsboard.yml | 7 +- .../provider/PubSubMonolithQueueFactory.java | 40 +++++++---- .../provider/PubSubTbCoreQueueFactory.java | 36 ++++++---- .../PubSubTbRuleEngineQueueFactory.java | 32 ++++++--- .../provider/PubSubTransportQueueFactory.java | 36 ++++++---- .../server/queue/pubsub/TbPubSubAdmin.java | 35 +++++++-- .../server/queue/pubsub/TbPubSubSettings.java | 3 - .../pubsub/TbPubSubSubscriptionSettings.java | 71 +++++++++++++++++++ .../src/main/resources/tb-coap-transport.yml | 7 +- .../src/main/resources/tb-http-transport.yml | 7 +- .../src/main/resources/tb-mqtt-transport.yml | 7 +- 11 files changed, 219 insertions(+), 62 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e1296275cd..0cae3252ff 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -543,9 +543,14 @@ queue: pubsub: project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" service_bus: namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index b1afc61dbd..ba806c5daf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -38,6 +38,7 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; @@ -53,88 +54,99 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; - private final TbQueueAdmin admin; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; + public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, PartitionService partitionService, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; - this.admin = new TbPubSubAdmin(pubSubSettings); this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings()); + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportNotificationSettings.getNotificationsTopic()); + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(), + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getResponsesTopic()); + return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index edfcdc3d8e..eaffce241f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; @@ -47,71 +49,79 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { private final TbPubSubSettings pubSubSettings; private final TbQueueCoreSettings coreSettings; private final TbQueueTransportApiSettings transportApiSettings; - private final TbQueueAdmin admin; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; + public PubSubTbCoreQueueFactory(TbPubSubSettings pubSubSettings, TbQueueCoreSettings coreSettings, TbQueueTransportApiSettings transportApiSettings, - TbQueueAdmin admin, PartitionService partitionService, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; - this.admin = admin; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings()); + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(), + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 101f335536..3206f03250 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -32,9 +32,11 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @@ -46,59 +48,67 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbPubSubSettings pubSubSettings; private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; - private final TbQueueAdmin admin; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin notificationAdmin; + public PubSubTbRuleEngineQueueFactory(TbPubSubSettings pubSubSettings, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, - TbQueueAdmin admin, PartitionService partitionService, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; - this.admin = admin; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(), + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java index f15faa11da..2a5de01f86 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java @@ -25,12 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -38,6 +34,11 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @@ -50,33 +51,42 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory { private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public PubSubTransportQueueFactory(TbPubSubSettings pubSubSettings, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, - TbQueueTransportNotificationSettings transportNotificationSettings) { + TbQueueTransportNotificationSettings transportNotificationSettings, + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; - this.admin = new TbPubSubAdmin(pubSubSettings); + + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings()); + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { - TbQueueProducer> producer = new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic()); - TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); - templateBuilder.queueAdmin(admin); + templateBuilder.queueAdmin(transportApiAdmin); templateBuilder.requestTemplate(producer); templateBuilder.responseTemplate(consumer); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); @@ -87,17 +97,17 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory { @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createTransportNotificationsConsumer() { - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java index cd575818f2..f3b6d3f10c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java @@ -19,32 +19,37 @@ import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.protobuf.Duration; import com.google.pubsub.v1.ListSubscriptionsRequest; import com.google.pubsub.v1.ListTopicsRequest; import com.google.pubsub.v1.ProjectName; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectTopicName; -import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; import java.io.IOException; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class TbPubSubAdmin implements TbQueueAdmin { + private static final String ACK_DEADLINE = "ackDeadlineInSec"; + private static final String MESSAGE_RETENTION = "messageRetentionInSec"; private final TbPubSubSettings pubSubSettings; private final SubscriptionAdminSettings subscriptionAdminSettings; private final TopicAdminSettings topicAdminSettings; private final Set topicSet = ConcurrentHashMap.newKeySet(); private final Set subscriptionSet = ConcurrentHashMap.newKeySet(); + private final Map subscriptionProperties; - public TbPubSubAdmin(TbPubSubSettings pubSubSettings) { + public TbPubSubAdmin(TbPubSubSettings pubSubSettings, Map subscriptionSettings) { this.pubSubSettings = pubSubSettings; + this.subscriptionProperties = subscriptionSettings; try { topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); @@ -149,8 +154,15 @@ public class TbPubSubAdmin implements TbQueueAdmin { } } - subscriptionAdminClient.createSubscription( - subscriptionName, topicName, PushConfig.getDefaultInstance(), pubSubSettings.getAckDeadline()).getName(); + Subscription.Builder subscriptionBuilder = Subscription + .newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()); + + setAckDeadline(subscriptionBuilder); + setMessageRetention(subscriptionBuilder); + + subscriptionAdminClient.createSubscription(subscriptionBuilder.build()); subscriptionSet.add(subscriptionName.toString()); log.info("Created new subscription: [{}]", subscriptionName.toString()); } catch (IOException e) { @@ -159,4 +171,19 @@ public class TbPubSubAdmin implements TbQueueAdmin { } } + private void setAckDeadline(Subscription.Builder builder) { + if (subscriptionProperties.containsKey(ACK_DEADLINE)) { + builder.setAckDeadlineSeconds(Integer.parseInt(subscriptionProperties.get(ACK_DEADLINE))); + } + } + + private void setMessageRetention(Subscription.Builder builder) { + if (subscriptionProperties.containsKey(MESSAGE_RETENTION)) { + Duration duration = Duration + .newBuilder() + .setSeconds(Long.parseLong(subscriptionProperties.get(MESSAGE_RETENTION))) + .build(); + builder.setMessageRetentionDuration(duration); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java index 851a2b3245..d6c6d0e271 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java @@ -40,9 +40,6 @@ public class TbPubSubSettings { @Value("${queue.pubsub.service_account}") private String serviceAccount; - @Value("${queue.pubsub.ack_deadline}") - private int ackDeadline; - @Value("${queue.pubsub.max_msg_size}") private int maxMsgSize; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java new file mode 100644 index 0000000000..e98f51ec7c --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.pubsub; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='pubsub'") +public class TbPubSubSubscriptionSettings { + @Value("${queue.pubsub.queue-properties.core}") + private String coreProperties; + @Value("${queue.pubsub.queue-properties.rule-engine}") + private String ruleEngineProperties; + @Value("${queue.pubsub.queue-properties.transport-api}") + private String transportApiProperties; + @Value("${queue.pubsub.queue-properties.notifications}") + private String notificationsProperties; + @Value("${queue.pubsub.queue-properties.js-executor}") + private String jsExecutorProperties; + + @Getter + private Map coreSettings; + @Getter + private Map ruleEngineSettings; + @Getter + private Map transportApiSettings; + @Getter + private Map notificationsSettings; + @Getter + private Map jsExecutorSettings; + + @PostConstruct + private void init() { + coreSettings = getSettings(coreProperties); + ruleEngineSettings = getSettings(ruleEngineProperties); + transportApiSettings = getSettings(transportApiProperties); + notificationsSettings = getSettings(notificationsProperties); + jsExecutorSettings = getSettings(jsExecutorProperties); + } + + private Map getSettings(String properties) { + Map configs = new HashMap<>(); + for (String property : properties.split(";")) { + int delimiterPosition = property.indexOf(":"); + String key = property.substring(0, delimiterPosition); + String value = property.substring(delimiterPosition + 1); + configs.put(key, value); + } + return configs; + } +} diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 78dc191e2c..c3e646c77c 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -72,9 +72,14 @@ queue: pubsub: project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" service_bus: namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 82b548a456..0d48bda2b9 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -73,9 +73,14 @@ queue: pubsub: project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" service_bus: namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e3faf9cb87..4249757db1 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -103,9 +103,14 @@ queue: pubsub: project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" service_bus: namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"