pubsub improvements

This commit is contained in:
YevhenBondarenko 2020-04-17 21:05:12 +03:00 committed by Andrew Shvayka
parent 1b9df18c45
commit 2ad4ddf1fb
5 changed files with 126 additions and 54 deletions

View File

@ -45,6 +45,8 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'") @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'")
public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
@ -153,4 +155,23 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -42,6 +42,8 @@ import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'")
public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@ -128,4 +130,20 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -41,6 +41,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'")
public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
@ -117,4 +119,20 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -40,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
@Slf4j @Slf4j
@ -111,4 +113,20 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -40,9 +40,10 @@ public class TbPubSubAdmin implements TbQueueAdmin {
private static final String ACK_DEADLINE = "ackDeadlineInSec"; private static final String ACK_DEADLINE = "ackDeadlineInSec";
private static final String MESSAGE_RETENTION = "messageRetentionInSec"; private static final String MESSAGE_RETENTION = "messageRetentionInSec";
private final TopicAdminClient topicAdminClient;
private final SubscriptionAdminClient subscriptionAdminClient;
private final TbPubSubSettings pubSubSettings; private final TbPubSubSettings pubSubSettings;
private final SubscriptionAdminSettings subscriptionAdminSettings;
private final TopicAdminSettings topicAdminSettings;
private final Set<String> topicSet = ConcurrentHashMap.newKeySet(); private final Set<String> topicSet = ConcurrentHashMap.newKeySet();
private final Set<String> subscriptionSet = ConcurrentHashMap.newKeySet(); private final Set<String> subscriptionSet = ConcurrentHashMap.newKeySet();
private final Map<String, String> subscriptionProperties; private final Map<String, String> subscriptionProperties;
@ -51,6 +52,7 @@ public class TbPubSubAdmin implements TbQueueAdmin {
this.pubSubSettings = pubSubSettings; this.pubSubSettings = pubSubSettings;
this.subscriptionProperties = subscriptionSettings; this.subscriptionProperties = subscriptionSettings;
TopicAdminSettings topicAdminSettings;
try { try {
topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build();
} catch (IOException e) { } catch (IOException e) {
@ -58,6 +60,7 @@ public class TbPubSubAdmin implements TbQueueAdmin {
throw new RuntimeException("Failed to create TopicAdminSettings."); throw new RuntimeException("Failed to create TopicAdminSettings.");
} }
SubscriptionAdminSettings subscriptionAdminSettings;
try { try {
subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build();
} catch (IOException e) { } catch (IOException e) {
@ -65,7 +68,9 @@ public class TbPubSubAdmin implements TbQueueAdmin {
throw new RuntimeException("Failed to create SubscriptionAdminSettings."); throw new RuntimeException("Failed to create SubscriptionAdminSettings.");
} }
try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { try {
topicAdminClient = TopicAdminClient.create(topicAdminSettings);
ListTopicsRequest listTopicsRequest = ListTopicsRequest listTopicsRequest =
ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build();
TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest);
@ -77,7 +82,8 @@ public class TbPubSubAdmin implements TbQueueAdmin {
throw new RuntimeException("Failed to get topics.", e); throw new RuntimeException("Failed to get topics.", e);
} }
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { try {
subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings);
ListSubscriptionsRequest listSubscriptionsRequest = ListSubscriptionsRequest listSubscriptionsRequest =
ListSubscriptionsRequest.newBuilder() ListSubscriptionsRequest.newBuilder()
@ -104,31 +110,21 @@ public class TbPubSubAdmin implements TbQueueAdmin {
return; return;
} }
try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { ListTopicsRequest listTopicsRequest =
ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build();
ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest);
TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); for (Topic topic : response.iterateAll()) {
for (Topic topic : response.iterateAll()) { if (topic.getName().contains(topicName.toString())) {
if (topic.getName().contains(topicName.toString())) { topicSet.add(topic.getName());
topicSet.add(topic.getName()); createSubscriptionIfNotExists(partition, topicName);
createSubscriptionIfNotExists(partition, topicName); return;
return;
}
} }
topicAdminClient.createTopic(topicName);
topicSet.add(topicName.toString());
log.info("Created new topic: [{}]", topicName.toString());
createSubscriptionIfNotExists(partition, topicName);
} catch (IOException e) {
log.error("Failed to create topic: [{}].", topicName.toString(), e);
throw new RuntimeException("Failed to create topic.", e);
} }
}
@Override
public void destroy() {
topicAdminClient.createTopic(topicName);
topicSet.add(topicName.toString());
log.info("Created new topic: [{}]", topicName.toString());
createSubscriptionIfNotExists(partition, topicName);
} }
private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) {
@ -139,36 +135,27 @@ public class TbPubSubAdmin implements TbQueueAdmin {
return; return;
} }
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { ListSubscriptionsRequest listSubscriptionsRequest =
ListSubscriptionsRequest listSubscriptionsRequest = ListSubscriptionsRequest.newBuilder().setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()).build();
ListSubscriptionsRequest.newBuilder() SubscriptionAdminClient.ListSubscriptionsPagedResponse response = subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest);
.setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()) for (Subscription subscription : response.iterateAll()) {
.build(); if (subscription.getName().equals(subscriptionName.toString())) {
SubscriptionAdminClient.ListSubscriptionsPagedResponse response = subscriptionSet.add(subscription.getName());
subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest); return;
for (Subscription subscription : response.iterateAll()) {
if (subscription.getName().equals(subscriptionName.toString())) {
subscriptionSet.add(subscription.getName());
return;
}
} }
Subscription.Builder subscriptionBuilder = Subscription
.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString());
setAckDeadline(subscriptionBuilder);
setMessageRetention(subscriptionBuilder);
subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
subscriptionSet.add(subscriptionName.toString());
log.info("Created new subscription: [{}]", subscriptionName.toString());
} catch (IOException e) {
log.error("Failed to create subscription: [{}].", subscriptionName.toString(), e);
throw new RuntimeException("Failed to create subscription.", e);
} }
Subscription.Builder subscriptionBuilder = Subscription
.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString());
setAckDeadline(subscriptionBuilder);
setMessageRetention(subscriptionBuilder);
subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
subscriptionSet.add(subscriptionName.toString());
log.info("Created new subscription: [{}]", subscriptionName.toString());
} }
private void setAckDeadline(Subscription.Builder builder) { private void setAckDeadline(Subscription.Builder builder) {
@ -186,4 +173,14 @@ public class TbPubSubAdmin implements TbQueueAdmin {
builder.setMessageRetentionDuration(duration); builder.setMessageRetentionDuration(duration);
} }
} }
@Override
public void destroy() {
if (topicAdminClient != null) {
topicAdminClient.close();
}
if (subscriptionAdminClient != null) {
subscriptionAdminClient.close();
}
}
} }