From 8d5c38b743a91c2ad3739c25c47f93ece5480f08 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 1 May 2020 17:43:13 +0300 Subject: [PATCH] Queue refactoring --- .../processing/AbstractConsumerService.java | 1 - .../BatchTbRuleEngineSubmitStrategy.java | 5 +- .../BurstTbRuleEngineSubmitStrategy.java | 10 +- ...lByEntityIdTbRuleEngineSubmitStrategy.java | 8 - ...lByTenantIdTbRuleEngineSubmitStrategy.java | 1 - .../SequentialTbRuleEngineSubmitStrategy.java | 6 +- ...TbRuleEngineProcessingStrategyFactory.java | 8 +- .../TbServiceBusConsumerTemplate.java | 132 ++++++---------- ...stractParallelTbQueueConsumerTemplate.java | 53 +++++++ .../AbstractTbQueueConsumerTemplate.java | 20 ++- .../queue/kafka/TbKafkaConsumerTemplate.java | 3 +- .../pubsub/TbPubSubConsumerTemplate.java | 108 +++++-------- .../rabbitmq/TbRabbitMqConsumerTemplate.java | 107 ++++--------- .../queue/sqs/TbAwsSqsConsumerTemplate.java | 142 ++++++------------ 14 files changed, 243 insertions(+), 361 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index c2705fcbdc..4007c9e17d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java index d0b1f7f99a..b9741d2433 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -23,7 +23,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS } } int submitSize = pendingPack.size(); - if (log.isInfoEnabled() && submitSize > 0) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); + if (log.isDebugEnabled() && submitSize > 0) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); } pendingPack.forEach(msgConsumer); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java index ffd1dd49d1..3420933d3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; -import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; @Slf4j public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @Override public void submitAttempt(BiConsumer> msgConsumer) { - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); } orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java index ae5993cb1c..473810b86c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java @@ -15,26 +15,18 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.gen.MsgProtos; -import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.stream.Collectors; @Slf4j public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java index b258c6db1b..37e9419edd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java index ef45b983fc..125a1d8ef8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu if (idx < listSize) { IdMsgPair pair = orderedMsgList.get(idx); expectedMsgId = pair.uuid; - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg); } msgConsumer.accept(pair.uuid, pair.msg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index bbf283e962..80b0523a81 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory { retryCount++; double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); if (maxRetries > 0 && retryCount > maxRetries) { - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); return new TbRuleEngineProcessingDecision(true, null); } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); return new TbRuleEngineProcessingDecision(true, null); } else { ConcurrentMap> toReprocess = new ConcurrentHashMap<>(initialTotalCount); @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory { if (retrySuccessful) { result.getSuccessMap().forEach(toReprocess::put); } - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory { @Override public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { if (!result.isSuccess()) { - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java index cca599d59a..4db5ade728 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.time.Duration; @@ -50,100 +50,70 @@ import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbServiceBusConsumerTemplate implements TbQueueConsumer { +public class TbServiceBusConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbServiceBusSettings serviceBusSettings; private final Gson gson = new Gson(); private Set receivers; - private volatile Set partitions; - private volatile boolean subscribed; - private volatile boolean stopped = false; private Map> pendingMessages = new ConcurrentHashMap<>(); private volatile int messagesPerQueue; public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.serviceBusSettings = serviceBusSettings; } @Override - public String getTopic() { - return topic; - } - - @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; - } - - @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; - } - - @Override - public void unsubscribe() { - stopped = true; - receivers.forEach(CoreMessageReceiver::closeAsync); - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - createReceivers(); - messagesPerQueue = receivers.size() / partitions.size(); - subscribed = true; - } - - List>> messageFutures = - receivers.stream() - .map(receiver -> receiver - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) - .whenComplete((messages, err) -> { - if (!CollectionUtils.isEmpty(messages)) { - pendingMessages.put(receiver, messages); - } else if (err != null) { - log.error("Failed to receive messages.", err); - } - })) - .collect(Collectors.toList()); - try { - return fromList(messageFutures) - .get() - .stream() - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) - .map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to parse message.", e); - throw new RuntimeException("Failed to parse message.", e); - } - }).collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } + protected List doPoll(long durationInMillis) { + List>> messageFutures = + receivers.stream() + .map(receiver -> receiver + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) + .whenComplete((messages, err) -> { + if (!CollectionUtils.isEmpty(messages)) { + pendingMessages.put(receiver, messages); + } else if (err != null) { + log.error("Failed to receive messages.", err); + } + })) + .collect(Collectors.toList()); + try { + return fromList(messageFutures) + .get() + .stream() + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Service Bus consumer is stopped.", getTopic()); + } else { + log.error("Failed to receive messages", e); } + return Collections.emptyList(); } - return Collections.emptyList(); + } + + @Override + protected void doSubscribe(List topicNames) { + createReceivers(); + messagesPerQueue = receivers.size() / partitions.size(); + } + + @Override + protected void doCommit() { + pendingMessages.forEach((receiver, msgs) -> + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); + pendingMessages.clear(); + } + + @Override + protected void doUnsubscribe() { + receivers.forEach(CoreMessageReceiver::closeAsync); } private void createReceivers() { @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate implements TbQue receivers = new HashSet<>(fromList(receiverFutures).get()); } catch (InterruptedException | ExecutionException e) { if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); + log.info("[{}] Service Bus consumer is stopped.", getTopic()); } else { log.error("Failed to create receivers", e); } @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate implements TbQue } @Override - public void commit() { - pendingMessages.forEach((receiver, msgs) -> - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); - pendingMessages.clear(); - } - - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); return decoder.decode(msg); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..bb83a79250 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Slf4j +public abstract class AbstractParallelTbQueueConsumerTemplate extends AbstractTbQueueConsumerTemplate { + + protected ListeningExecutorService consumerExecutor; + + public AbstractParallelTbQueueConsumerTemplate(String topic) { + super(topic); + } + + protected void initNewExecutor(int threadPoolSize) { + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + try { + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.trace("Interrupted while waiting for consumer executor to stop"); + } + } + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize)); + } + + protected void shutdownExecutor() { + if (consumerExecutor != null) { + consumerExecutor.shutdownNow(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 084d10fca9..c8cc545601 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -28,11 +28,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; @Slf4j public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { private volatile boolean subscribed; + protected volatile boolean stopped = false; protected volatile Set partitions; protected final Lock consumerLock = new ReentrantLock(); @@ -74,10 +76,12 @@ public abstract class AbstractTbQueueConsumerTemplate i log.debug("Failed to await subscription", e); } } else { + long pollStartTs = System.currentTimeMillis(); consumerLock.lock(); try { if (!subscribed) { - doSubscribe(); + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + doSubscribe(topicNames); subscribed = true; } @@ -95,6 +99,17 @@ public abstract class AbstractTbQueueConsumerTemplate i } }); return result; + } else { + long pollDuration = System.currentTimeMillis() - pollStartTs; + if (pollDuration < durationInMillis) { + try { + Thread.sleep(durationInMillis - pollDuration); + } catch (InterruptedException e) { + if (!stopped) { + log.error("Failed to wait.", e); + } + } + } } } finally { consumerLock.unlock(); @@ -115,6 +130,7 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { + stopped = true; consumerLock.lock(); try { doUnsubscribe(); @@ -127,7 +143,7 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected T decode(R record) throws IOException; - abstract protected void doSubscribe(); + abstract protected void doSubscribe(List topicNames); abstract protected void doCommit(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index fea31854df..75635de7a4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -69,8 +69,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue } @Override - protected void doSubscribe() { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + protected void doSubscribe( List topicNames) { topicNames.forEach(admin::createTopicIfNotExists); consumer.subscribe(topicNames); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index 5dc795739a..7302d19ff7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.pubsub; +import com.amazonaws.services.sqs.model.Message; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; @@ -35,11 +36,14 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -47,10 +51,11 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j -public class TbPubSubConsumerTemplate implements TbQueueConsumer { +public class TbPubSubConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; @@ -58,23 +63,18 @@ public class TbPubSubConsumerTemplate implements TbQueueCo private final TbQueueMsgDecoder decoder; private final TbPubSubSettings pubSubSettings; - private volatile boolean subscribed; - private volatile Set partitions; private volatile Set subscriptionNames; private final List acknowledgeRequests = new CopyOnWriteArrayList<>(); - private ExecutorService consumerExecutor; private final SubscriberStub subscriber; - private volatile boolean stopped; - private volatile int messagesPerTopic; public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.pubSubSettings = pubSubSettings; this.topic = topic; this.decoder = decoder; - try { SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() @@ -84,91 +84,52 @@ public class TbPubSubConsumerTemplate implements TbQueueCo .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) .build()) .build(); - this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); } catch (IOException e) { log.error("Failed to create subscriber.", e); throw new RuntimeException("Failed to create subscriber.", e); } - stopped = false; } @Override - public String getTopic() { - return topic; - } - - @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; - } - - @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; - } - - @Override - public void unsubscribe() { - stopped = true; - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } - - if (subscriber != null) { - subscriber.close(); - } - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); + protected List doPoll(long durationInMillis) { + try { + List messages = receiveMessages(); + if (!messages.isEmpty()) { + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList()); } - } else { - if (!subscribed) { - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); - subscriptionNames.forEach(admin::createTopicIfNotExists); - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); - subscribed = true; - } - List messages; - try { - messages = receiveMessages(); - if (!messages.isEmpty()) { - List result = new ArrayList<>(); - messages.forEach(msg -> { - try { - result.add(decode(msg.getMessage())); - } catch (InvalidProtocolBufferException e) { - log.error("Failed decode record: [{}]", msg); - } - }); - return result; - } - } catch (ExecutionException | InterruptedException e) { - if (stopped) { - log.info("[{}] Pub/Sub consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } + } catch (ExecutionException | InterruptedException e) { + if (stopped) { + log.info("[{}] Pub/Sub consumer is stopped.", topic); + } else { + log.error("Failed to receive messages", e); } } return Collections.emptyList(); } @Override - public void commit() { + protected void doSubscribe(List topicNames) { + subscriptionNames = new LinkedHashSet<>(topicNames); + subscriptionNames.forEach(admin::createTopicIfNotExists); + initNewExecutor(subscriptionNames.size() + 1); + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); + } + + @Override + protected void doCommit() { acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); acknowledgeRequests.clear(); } + @Override + protected void doUnsubscribe() { + if (subscriber != null) { + subscriber.close(); + } + shutdownExecutor(); + } + private List receiveMessages() throws ExecutionException, InterruptedException { List>> result = subscriptionNames.stream().map(subscriptionId -> { String subscriptionName = ProjectSubscriptionName.format(pubSubSettings.getProjectId(), subscriptionId); @@ -211,6 +172,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo return transform.get(); } + @Override public T decode(PubsubMessage message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java index 25d7719163..45dc9d6a05 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Slf4j -public class TbRabbitMqConsumerTemplate implements TbQueueConsumer { +public class TbRabbitMqConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; - private final TbRabbitMqSettings rabbitMqSettings; private final Channel channel; private final Connection connection; - private volatile Set partitions; - private volatile boolean subscribed; private volatile Set queues; - private volatile boolean stopped; public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; - this.rabbitMqSettings = rabbitMqSettings; try { connection = rabbitMqSettings.getConnectionFactory().newConnection(); } catch (IOException | TimeoutException e) { log.error("Failed to create connection.", e); throw new RuntimeException("Failed to create connection.", e); } - try { channel = connection.createChannel(); } catch (IOException e) { @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + List result = queues.stream() + .map(queue -> { + try { + return channel.basicGet(queue, false); + } catch (IOException e) { + log.error("Failed to get messages from queue: [{}]", queue); + throw new RuntimeException("Failed to get messages from queue.", e); + } + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (result.size() > 0) { + return result; + } else { + return Collections.emptyList(); + } } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + queues = partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .collect(Collectors.toSet()); + queues.forEach(admin::createTopicIfNotExists); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + try { + channel.basicAck(0, true); + } catch (IOException e) { + log.error("Failed to ack messages.", e); + } } @Override - public void unsubscribe() { - stopped = true; + protected void doUnsubscribe() { if (channel != null) { try { channel.close(); @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } } - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - queues = partitions.stream() - .map(TopicPartitionInfo::getFullTopicName) - .collect(Collectors.toSet()); - - queues.forEach(admin::createTopicIfNotExists); - subscribed = true; - } - - List result = queues.stream() - .map(queue -> { - try { - return channel.basicGet(queue, false); - } catch (IOException e) { - log.error("Failed to get messages from queue: [{}]", queue); - throw new RuntimeException("Failed to get messages from queue.", e); - } - }).filter(Objects::nonNull).map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to decode message: [{}].", message); - throw new RuntimeException("Failed to decode message.", e); - } - }).collect(Collectors.toList()); - if (result.size() > 0) { - return result; - } - } - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - if (!stopped) { - log.error("Failed to wait.", e); - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - try { - channel.basicAck(0, true); - } catch (IOException e) { - log.error("Failed to ack messages.", e); - } - } - public T decode(GetResponse message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java index 3e71388844..317dd93902 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.protobuf.InvalidProtocolBufferException; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -47,34 +43,28 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbAwsSqsConsumerTemplate implements TbQueueConsumer { +public class TbAwsSqsConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private static final int MAX_NUM_MSGS = 10; private final Gson gson = new Gson(); private final TbQueueAdmin admin; private final AmazonSQS sqsClient; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbAwsSqsSettings sqsSettings; private final List pendingMessages = new CopyOnWriteArrayList<>(); private volatile Set queueUrls; - private volatile Set partitions; - private ListeningExecutorService consumerExecutor; - private volatile boolean subscribed; - private volatile boolean stopped = false; public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.sqsSettings = sqsSettings; AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); @@ -87,81 +77,64 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo } @Override - public String getTopic() { - return topic; + protected void doSubscribe(List topicNames) { + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1); } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected List doPoll(long durationInMillis) { + if (!pendingMessages.isEmpty()) { + log.warn("Present {} non committed messages.", pendingMessages.size()); + return Collections.emptyList(); + } + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis); + List>> futureList = queueUrls + .stream() + .map(url -> poll(url, duration)) + .collect(Collectors.toList()); + ListenableFuture>> futureResult = Futures.allAsList(futureList); + try { + return futureResult.get().stream() + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Aws SQS consumer is stopped.", getTopic()); + } else { + log.error("Failed to pool messages.", e); + } + return Collections.emptyList(); + } } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + public T decode(Message message) throws InvalidProtocolBufferException { + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); + return decoder.decode(msg); } @Override - public void unsubscribe() { + protected void doCommit() { + pendingMessages.forEach(msg -> + consumerExecutor.submit(() -> { + List entries = msg.getMessages() + .stream() + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) + .collect(Collectors.toList()); + sqsClient.deleteMessageBatch(msg.getUrl(), entries); + })); + pendingMessages.clear(); + } + + @Override + protected void doUnsubscribe() { stopped = true; - if (sqsClient != null) { sqsClient.shutdown(); } - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); - subscribed = true; - } - - if (!pendingMessages.isEmpty()) { - log.warn("Present {} non committed messages.", pendingMessages.size()); - return Collections.emptyList(); - } - - List>> futureList = queueUrls - .stream() - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis))) - .collect(Collectors.toList()); - ListenableFuture>> futureResult = Futures.allAsList(futureList); - try { - return futureResult.get().stream() - .flatMap(List::stream) - .map(msg -> { - try { - return decode(msg); - } catch (IOException e) { - log.error("Failed to decode message: [{}]", msg); - return null; - } - }).filter(Objects::nonNull) - .collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Aws SQS consumer is stopped.", topic); - } else { - log.error("Failed to pool messages.", e); - } - } - } - return Collections.emptyList(); + shutdownExecutor(); } private ListenableFuture> poll(String url, int waitTimeSeconds) { @@ -194,25 +167,6 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo }, consumerExecutor); } - @Override - public void commit() { - pendingMessages.forEach(msg -> - consumerExecutor.submit(() -> { - List entries = msg.getMessages() - .stream() - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) - .collect(Collectors.toList()); - sqsClient.deleteMessageBatch(msg.getUrl(), entries); - })); - - pendingMessages.clear(); - } - - public T decode(Message message) throws InvalidProtocolBufferException { - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); - return decoder.decode(msg); - } - @Data private static class AwsSqsMsgWrapper { private final String url;