add version control queue factory realizations for other QueueFactories

This commit is contained in:
ivankozka 2022-05-25 17:26:16 +03:00
parent 5bb9bdface
commit b94b590e7c
12 changed files with 456 additions and 15 deletions

View File

@ -37,7 +37,8 @@ public class TbServiceBusQueueConfigs {
private String notificationsProperties;
@Value("${queue.service-bus.queue-properties.js-executor}")
private String jsExecutorProperties;
@Value("${queue.service-bus.queue-properties.version-control:}")
private String vcProperties;
@Getter
private Map<String, String> coreConfigs;
@Getter
@ -48,6 +49,8 @@ public class TbServiceBusQueueConfigs {
private Map<String, String> notificationsConfigs;
@Getter
private Map<String, String> jsExecutorConfigs;
@Getter
private Map<String, String> vcConfigs;
@PostConstruct
private void init() {
@ -56,6 +59,7 @@ public class TbServiceBusQueueConfigs {
transportApiConfigs = getConfigs(transportApiProperties);
notificationsConfigs = getConfigs(notificationsProperties);
jsExecutorConfigs = getConfigs(jsExecutorProperties);
vcConfigs = getConfigs(vcProperties);
}
private Map<String, String> getConfigs(String properties) {

View File

@ -47,6 +47,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
@ -58,7 +59,7 @@ import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'")
public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory, TbVersionControlQueueFactory {
private final NotificationsTopicService notificationsTopicService;
private final TbQueueCoreSettings coreSettings;
@ -67,6 +68,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbAwsSqsSettings sqsSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueAdmin coreAdmin;
@ -74,6 +76,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public AwsSqsMonolithQueueFactory(NotificationsTopicService notificationsTopicService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -81,6 +84,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbAwsSqsSettings sqsSettings,
TbQueueVersionControlSettings vcSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
this.notificationsTopicService = notificationsTopicService;
@ -90,6 +94,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.sqsSettings = sqsSettings;
this.vcSettings = vcSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
@ -97,6 +102,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
this.jsExecutorAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getJsExecutorAttributes());
this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.vcAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getVcAttributes());
}
@Override
@ -124,6 +130,13 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(vcAdmin, sqsSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
@ -208,8 +221,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
//TODO: version-control
return null;
return new TbAwsSqsProducerTemplate<>(vcAdmin, sqsSettings, vcSettings.getTopic());
}
@PreDestroy
@ -229,5 +241,8 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -0,0 +1,91 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes;
import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
import javax.annotation.PreDestroy;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-vc-executor'")
public class AwsSqsTbVersionControlQueueFactory implements TbVersionControlQueueFactory {
private final TbAwsSqsSettings sqsSettings;
private final TbQueueCoreSettings coreSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public AwsSqsTbVersionControlQueueFactory(TbAwsSqsSettings sqsSettings,
TbQueueCoreSettings coreSettings,
TbQueueVersionControlSettings vcSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes
) {
this.sqsSettings = sqsSettings;
this.coreSettings = coreSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.vcAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getVcAttributes());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(vcAdmin, sqsSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -52,13 +52,14 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'")
public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory, TbVersionControlQueueFactory {
private final TbPubSubSettings pubSubSettings;
private final TbQueueCoreSettings coreSettings;
@ -68,12 +69,14 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
private final NotificationsTopicService notificationsTopicService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
@ -83,7 +86,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
NotificationsTopicService notificationsTopicService,
TbServiceInfoProvider serviceInfoProvider,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -91,12 +95,15 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
this.transportNotificationSettings = transportNotificationSettings;
this.notificationsTopicService = notificationsTopicService;
this.serviceInfoProvider = serviceInfoProvider;
this.vcSettings = vcSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
this.vcAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getVcSettings());
this.jsInvokeSettings = jsInvokeSettings;
}
@ -126,6 +133,13 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
@ -210,8 +224,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
//TODO: version-control
return null;
return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic());
}
@PreDestroy
@ -231,5 +244,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -0,0 +1,90 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-vc-executor'")
public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueueFactory {
private final TbPubSubSettings pubSubSettings;
private final TbQueueCoreSettings coreSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public PubSubTbVersionControlQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
TbQueueVersionControlSettings vcSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings
) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
this.vcAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getVcSettings());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -52,13 +52,14 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'")
public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory, TbVersionControlQueueFactory {
private final NotificationsTopicService notificationsTopicService;
private final TbQueueCoreSettings coreSettings;
@ -68,12 +69,14 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public RabbitMqMonolithQueueFactory(NotificationsTopicService notificationsTopicService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -82,7 +85,8 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
TbQueueTransportNotificationSettings transportNotificationSettings,
TbRabbitMqSettings rabbitMqSettings,
TbRabbitMqQueueArguments queueArguments,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings) {
this.notificationsTopicService = notificationsTopicService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
@ -91,12 +95,14 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
this.transportNotificationSettings = transportNotificationSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs());
this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs());
this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs());
this.vcAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getVcArgs());
}
@Override
@ -124,6 +130,13 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(vcAdmin, rabbitMqSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
@ -208,8 +221,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
//TODO: version-control
return null;
return new TbRabbitMqProducerTemplate<>(vcAdmin, rabbitMqSettings, vcSettings.getTopic());
}
@PreDestroy
@ -229,5 +241,8 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -0,0 +1,90 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-vc-executor'")
public class RabbitMqTbVersionControlQueueFactory implements TbVersionControlQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueCoreSettings coreSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public RabbitMqTbVersionControlQueueFactory(TbRabbitMqSettings rabbitMqSettings,
TbQueueCoreSettings coreSettings,
TbQueueVersionControlSettings vcSettings,
TbRabbitMqQueueArguments queueArguments
) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbRabbitMqAdmin(this.rabbitMqSettings, queueArguments.getCoreArgs());
this.notificationAdmin = new TbRabbitMqAdmin(this.rabbitMqSettings, queueArguments.getNotificationsArgs());
this.vcAdmin = new TbRabbitMqAdmin(this.rabbitMqSettings, queueArguments.getVcArgs());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(vcAdmin, rabbitMqSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -51,13 +51,14 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'")
public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory, TbVersionControlQueueFactory {
private final NotificationsTopicService notificationsTopicService;
private final TbQueueCoreSettings coreSettings;
@ -67,12 +68,14 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbServiceBusSettings serviceBusSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public ServiceBusMonolithQueueFactory(NotificationsTopicService notificationsTopicService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -81,6 +84,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusSettings serviceBusSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.notificationsTopicService = notificationsTopicService;
this.coreSettings = coreSettings;
@ -90,12 +94,14 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
this.transportNotificationSettings = transportNotificationSettings;
this.serviceBusSettings = serviceBusSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
this.vcAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getVcConfigs());
}
@Override
@ -123,6 +129,13 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(vcAdmin, serviceBusSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
@ -207,8 +220,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
//TODO: version-control
return null;
return new TbServiceBusProducerTemplate<>(vcAdmin, serviceBusSettings, vcSettings.getTopic());
}
@PreDestroy
@ -228,5 +240,8 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -0,0 +1,90 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import javax.annotation.PreDestroy;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-vc-executor'")
public class ServiceBusTbVersionControlQueueFactory implements TbVersionControlQueueFactory {
private final TbServiceBusSettings serviceBusSettings;
private final TbQueueCoreSettings coreSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
public ServiceBusTbVersionControlQueueFactory(TbServiceBusSettings serviceBusSettings,
TbQueueCoreSettings coreSettings,
TbQueueVersionControlSettings vcSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs
) {
this.serviceBusSettings = serviceBusSettings;
this.coreSettings = coreSettings;
this.vcSettings = vcSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
this.vcAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getVcConfigs());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(vcAdmin, serviceBusSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (vcAdmin != null) {
vcAdmin.destroy();
}
}
}

View File

@ -37,6 +37,8 @@ public class TbPubSubSubscriptionSettings {
private String notificationsProperties;
@Value("${queue.pubsub.queue-properties.js-executor}")
private String jsExecutorProperties;
@Value("${queue.pubsub.queue-properties.version-control:}")
private String vcProperties;
@Getter
private Map<String, String> coreSettings;
@ -48,6 +50,8 @@ public class TbPubSubSubscriptionSettings {
private Map<String, String> notificationsSettings;
@Getter
private Map<String, String> jsExecutorSettings;
@Getter
private Map<String, String> vcSettings;
@PostConstruct
private void init() {
@ -56,6 +60,7 @@ public class TbPubSubSubscriptionSettings {
transportApiSettings = getSettings(transportApiProperties);
notificationsSettings = getSettings(notificationsProperties);
jsExecutorSettings = getSettings(jsExecutorProperties);
vcSettings = getSettings(vcProperties);
}
private Map<String, String> getSettings(String properties) {

View File

@ -38,6 +38,8 @@ public class TbRabbitMqQueueArguments {
private String notificationsProperties;
@Value("${queue.rabbitmq.queue-properties.js-executor}")
private String jsExecutorProperties;
@Value("${queue.rabbitmq.queue-properties.version-control:}")
private String vcProperties;
@Getter
private Map<String, Object> coreArgs;
@ -49,6 +51,8 @@ public class TbRabbitMqQueueArguments {
private Map<String, Object> notificationsArgs;
@Getter
private Map<String, Object> jsExecutorArgs;
@Getter
private Map<String, Object> vcArgs;
@PostConstruct
private void init() {
@ -57,6 +61,7 @@ public class TbRabbitMqQueueArguments {
transportApiArgs = getArgs(transportApiProperties);
notificationsArgs = getArgs(notificationsProperties);
jsExecutorArgs = getArgs(jsExecutorProperties);
vcArgs = getArgs(vcProperties);
}
private Map<String, Object> getArgs(String properties) {

View File

@ -38,6 +38,8 @@ public class TbAwsSqsQueueAttributes {
private String notificationsProperties;
@Value("${queue.aws-sqs.queue-properties.js-executor}")
private String jsExecutorProperties;
@Value("${queue.aws-sqs.queue-properties.version-control:}")
private String vcProperties;
@Getter
private Map<String, String> coreAttributes;
@ -49,6 +51,8 @@ public class TbAwsSqsQueueAttributes {
private Map<String, String> notificationsAttributes;
@Getter
private Map<String, String> jsExecutorAttributes;
@Getter
private Map<String, String> vcAttributes;
private final Map<String, String> defaultAttributes = new HashMap<>();
@ -61,6 +65,7 @@ public class TbAwsSqsQueueAttributes {
transportApiAttributes = getConfigs(transportApiProperties);
notificationsAttributes = getConfigs(notificationsProperties);
jsExecutorAttributes = getConfigs(jsExecutorProperties);
vcAttributes = getConfigs(vcProperties);
}
private Map<String, String> getConfigs(String properties) {