From ca193239bab7ba4331a61bd9e061f0f8998bc42c Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 9 Apr 2020 18:33:44 +0300 Subject: [PATCH] RE Submit Strategies --- .../server/actors/ActorSystemContext.java | 7 +- .../queue/DefaultTbCoreConsumerService.java | 6 +- .../DefaultTbRuleEngineConsumerService.java | 77 ++++++++----- .../queue/ProcessingAttemptContext.java | 89 +++++++++++++++ .../service/queue/TbMsgPackCallback.java | 38 +----- .../queue/TbRuleEngineConsumerStats.java | 7 +- .../AbstractTbRuleEngineSubmitStrategy.java | 71 ++++++++++++ .../BatchTbRuleEngineSubmitStrategy.java | 86 ++++++++++++++ .../BurstTbRuleEngineSubmitStrategy.java | 50 ++++++++ .../service/queue/processing/IdMsgPair.java | 31 +++++ ...lByEntityIdTbRuleEngineSubmitStrategy.java | 108 ++++++++++++++++++ ...riginatorIdTbRuleEngineSubmitStrategy.java | 44 +++++++ ...lByTenantIdTbRuleEngineSubmitStrategy.java | 35 ++++++ .../SequentialTbRuleEngineSubmitStrategy.java | 73 ++++++++++++ .../TbRuleEngineProcessingResult.java | 38 +++--- ...TbRuleEngineProcessingStrategyFactory.java | 10 +- .../TbRuleEngineSubmitStrategy.java | 39 +++++++ .../TbRuleEngineSubmitStrategyFactory.java | 43 +++++++ .../rpc/DefaultTbCoreDeviceRpcService.java | 2 +- .../src/main/resources/thingsboard.yml | 46 +++++--- ...leEngineQueueAckStrategyConfiguration.java | 4 - .../TbRuleEngineQueueConfiguration.java | 3 +- ...ngineQueueSubmitStrategyConfiguration.java | 26 +++++ 23 files changed, 814 insertions(+), 119 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/ProcessingAttemptContext.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/IdMsgPair.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategyFactory.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueSubmitStrategyConfiguration.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c266407707..d31cb62b9b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -334,7 +335,6 @@ public class ActorSystemContext { @Setter private ActorSystem actorSystem; - @Getter @Setter private ActorRef appActor; @@ -361,6 +361,8 @@ public class ActorSystemContext { config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); } + + public Scheduler getScheduler() { return actorSystem.scheduler(); } @@ -535,4 +537,7 @@ public class ActorSystemContext { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } + public void tell(TbActorMsg tbActorMsg, ActorRef sender) { + appActor.tell(tbActorMsg, sender); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 401262a243..263e6eab24 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -137,7 +137,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender()); + actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); } @@ -194,7 +194,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender()); + actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); } @@ -259,7 +259,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); + private ExecutorService submitExecutor; - public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory factory, TbQueueRuleEngineSettings ruleEngineSettings, + public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, + TbRuleEngineSubmitStrategyFactory submitStrategyFactory, + TbQueueRuleEngineSettings ruleEngineSettings, TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, ActorSystemContext actorContext, DataDecodingEncodingService encodingService) { super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); this.statisticsService = statisticsService; this.ruleEngineSettings = ruleEngineSettings; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; - this.factory = factory; + this.submitStrategyFactory = submitStrategyFactory; + this.processingStrategyFactory = processingStrategyFactory; } @PostConstruct @@ -102,6 +111,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName())); } + submitExecutor = Executors.newSingleThreadExecutor(); + } + + @PreDestroy + public void stop() { + if (submitExecutor != null) { + submitExecutor.shutdownNow(); + } } @Override @@ -131,27 +148,18 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (msgs.isEmpty()) { continue; } - TbRuleEngineProcessingStrategy strategy = factory.newInstance(configuration.getName(), configuration.getAckStrategy()); - TbRuleEngineProcessingDecision decision = null; - boolean firstAttempt = true; - while (!stopped && (firstAttempt || !decision.isCommit())) { - ConcurrentMap> allMap; - if (firstAttempt) { - allMap = msgs.stream().collect( - Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); - firstAttempt = false; - } else { - allMap = decision.getReprocessMap(); - } - ConcurrentMap> successMap = new ConcurrentHashMap<>(); - ConcurrentMap> failedMap = new ConcurrentHashMap<>(); - ConcurrentMap exceptionsMap = new ConcurrentHashMap<>(); - CountDownLatch processingTimeoutLatch = new CountDownLatch(1); - allMap.forEach((id, msg) -> { - log.trace("[{}] Creating main callback for message: {}", id, msg.getValue()); + TbRuleEngineSubmitStrategy submitStrategy = submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy()); + TbRuleEngineProcessingStrategy ackStrategy = processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy()); + + submitStrategy.init(msgs); + + while (!stopped) { + ProcessingAttemptContext ctx = new ProcessingAttemptContext(submitStrategy); + submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> { + log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); - TbMsgCallback callback = new TbMsgPackCallback<>(id, tenantId, processingTimeoutLatch, allMap, successMap, failedMap, exceptionsMap); + TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx); try { if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { forwardToRuleEngineActor(tenantId, toRuleEngineMsg, callback); @@ -161,17 +169,24 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } catch (Exception e) { callback.onFailure(new RuleEngineException(e.getMessage())); } - }); + })); boolean timeout = false; - if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { timeout = true; } - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, allMap, successMap, failedMap, exceptionsMap); - decision = strategy.analyze(result); + + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, ctx); + TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); if (statsEnabled) { stats.log(result, decision.isCommit()); } + if (decision.isCommit()) { + submitStrategy.stop(); + break; + } else { + submitStrategy.update(decision.getReprocessMap()); + } } consumer.commit(); } catch (Exception e) { @@ -211,7 +226,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< Optional actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender()); + actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); } else { @@ -232,7 +247,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage()); - actorContext.getAppActor().tell(msg, ActorRef.noSender()); + actorContext.tell(msg, ActorRef.noSender()); } @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProcessingAttemptContext.java b/application/src/main/java/org/thingsboard/server/service/queue/ProcessingAttemptContext.java new file mode 100644 index 0000000000..2073ff8c21 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProcessingAttemptContext.java @@ -0,0 +1,89 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.Getter; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ProcessingAttemptContext { + + private final TbRuleEngineSubmitStrategy submitStrategy; + + private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + @Getter + private final ConcurrentMap> pendingMap; + @Getter + private final ConcurrentMap> successMap = new ConcurrentHashMap<>(); + @Getter + private final ConcurrentMap> failedMap = new ConcurrentHashMap<>(); + @Getter + private final ConcurrentMap exceptionsMap = new ConcurrentHashMap<>(); + + public ProcessingAttemptContext(TbRuleEngineSubmitStrategy submitStrategy) { + this.submitStrategy = submitStrategy; + this.pendingMap = submitStrategy.getPendingMap(); + } + + public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { + return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); + } + + public void onSuccess(UUID id) { + TbProtoQueueMsg msg; + boolean empty = false; + synchronized (pendingMap) { + msg = pendingMap.remove(id); + if (msg != null) { + empty = pendingMap.isEmpty(); + } + } + if (msg != null) { + successMap.put(id, msg); + } + submitStrategy.onSuccess(id); + if (empty) { + processingTimeoutLatch.countDown(); + } + } + + public void onFailure(TenantId tenantId, UUID id, RuleEngineException e) { + TbProtoQueueMsg msg; + boolean empty = false; + synchronized (pendingMap) { + msg = pendingMap.remove(id); + if (msg != null) { + empty = pendingMap.isEmpty(); + } + } + if (msg != null) { + failedMap.put(id, msg); + exceptionsMap.putIfAbsent(tenantId, e); + } + if (empty) { + processingTimeoutLatch.countDown(); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index 0470db4cd7..2a6b6a658d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -27,52 +27,26 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @Slf4j -public class TbMsgPackCallback implements TbMsgCallback { - private final CountDownLatch processingTimeoutLatch; - private final ConcurrentMap ackMap; - private final ConcurrentMap successMap; - private final ConcurrentMap failedMap; +public class TbMsgPackCallback implements TbMsgCallback { private final UUID id; private final TenantId tenantId; - private final ConcurrentMap firstExceptions; + private final ProcessingAttemptContext ctx; - public TbMsgPackCallback(UUID id, TenantId tenantId, - CountDownLatch processingTimeoutLatch, - ConcurrentMap ackMap, - ConcurrentMap successMap, - ConcurrentMap failedMap, - ConcurrentMap firstExceptions) { + public TbMsgPackCallback(UUID id, TenantId tenantId, ProcessingAttemptContext ctx) { this.id = id; this.tenantId = tenantId; - this.processingTimeoutLatch = processingTimeoutLatch; - this.ackMap = ackMap; - this.successMap = successMap; - this.failedMap = failedMap; - this.firstExceptions = firstExceptions; + this.ctx = ctx; } @Override public void onSuccess() { log.trace("[{}] ON SUCCESS", id); - T msg = ackMap.remove(id); - if (msg != null) { - successMap.put(id, msg); - } - if (msg != null && ackMap.isEmpty()) { - processingTimeoutLatch.countDown(); - } + ctx.onSuccess(id); } @Override public void onFailure(RuleEngineException e) { log.trace("[{}] ON FAILURE", id, e); - T msg = ackMap.remove(id); - if (msg != null) { - failedMap.put(id, msg); - firstExceptions.putIfAbsent(tenantId, e); - } - if (ackMap.isEmpty()) { - processingTimeoutLatch.countDown(); - } + ctx.onFailure(tenantId, id, e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index 22f4edbbba..40017d2b40 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -19,7 +19,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; -import org.thingsboard.server.common.msg.queue.RuleNodeException; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; @@ -77,7 +76,7 @@ public class TbRuleEngineConsumerStats { public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) { int success = msg.getSuccessMap().size(); int pending = msg.getPendingMap().size(); - int failed = msg.getFailureMap().size(); + int failed = msg.getFailedMap().size(); totalMsgCounter.addAndGet(success + pending + failed); successMsgCounter.addAndGet(success); msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess()); @@ -89,7 +88,7 @@ public class TbRuleEngineConsumerStats { msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout()); } if (failed > 0) { - msg.getFailureMap().values().forEach(m -> getTenantStats(m).logFailed()); + msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed()); } failedIterationsCounter.incrementAndGet(); } else { @@ -103,7 +102,7 @@ public class TbRuleEngineConsumerStats { msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout()); } if (failed > 0) { - msg.getFailureMap().values().forEach(m -> getTenantStats(m).logTmpFailed()); + msg.getFailedMap().values().forEach(m -> getTenantStats(m).logTmpFailed()); } } msg.getExceptionsMap().forEach(tenantExceptions::putIfAbsent); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..bef733ec22 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngineSubmitStrategy { + + protected final String queueName; + protected List orderedMsgList; + private volatile boolean stopped; + + public AbstractTbRuleEngineSubmitStrategy(String queueName) { + this.queueName = queueName; + } + + protected abstract void doOnSuccess(UUID id); + + @Override + public void init(List> msgs) { + orderedMsgList = msgs.stream().map(msg -> new IdMsgPair(UUID.randomUUID(), msg)).collect(Collectors.toList()); + } + + @Override + public ConcurrentMap> getPendingMap() { + return orderedMsgList.stream().collect(Collectors.toConcurrentMap(pair -> pair.uuid, pair -> pair.msg)); + } + + @Override + public void update(ConcurrentMap> reprocessMap) { + List newOrderedMsgList = new ArrayList<>(reprocessMap.size()); + for (IdMsgPair pair : orderedMsgList) { + if (reprocessMap.containsKey(pair.uuid)) { + newOrderedMsgList.add(pair); + } + } + orderedMsgList = newOrderedMsgList; + } + + @Override + public void onSuccess(UUID id) { + if (!stopped) { + doOnSuccess(id); + } + } + + @Override + public void stop() { + stopped = true; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..d0b1f7f99a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -0,0 +1,86 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +@Slf4j +public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { + + private final int batchSize; + private final AtomicInteger packIdx = new AtomicInteger(0); + private final Map> pendingPack = new LinkedHashMap<>(); + private volatile BiConsumer> msgConsumer; + + public BatchTbRuleEngineSubmitStrategy(String queueName, int batchSize) { + super(queueName); + this.batchSize = batchSize; + } + + @Override + public void submitAttempt(BiConsumer> msgConsumer) { + this.msgConsumer = msgConsumer; + submitNext(); + } + + @Override + public void update(ConcurrentMap> reprocessMap) { + super.update(reprocessMap); + packIdx.set(0); + } + + @Override + protected void doOnSuccess(UUID id) { + boolean endOfPendingPack; + synchronized (pendingPack) { + TbProtoQueueMsg msg = pendingPack.remove(id); + endOfPendingPack = msg != null && pendingPack.isEmpty(); + } + if (endOfPendingPack) { + packIdx.incrementAndGet(); + submitNext(); + } + } + + private void submitNext() { + int listSize = orderedMsgList.size(); + int startIdx = Math.min(packIdx.get() * batchSize, listSize); + int endIdx = Math.min(startIdx + batchSize, listSize); + synchronized (pendingPack) { + pendingPack.clear(); + for (int i = startIdx; i < endIdx; i++) { + IdMsgPair pair = orderedMsgList.get(i); + pendingPack.put(pair.uuid, pair.msg); + } + } + int submitSize = pendingPack.size(); + if (log.isInfoEnabled() && submitSize > 0) { + log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); + } + pendingPack.forEach(msgConsumer); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..ffd1dd49d1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { + + public BurstTbRuleEngineSubmitStrategy(String queueName) { + super(queueName); + } + + @Override + public void submitAttempt(BiConsumer> msgConsumer) { + if (log.isInfoEnabled()) { + log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); + } + orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); + } + + @Override + protected void doOnSuccess(UUID id) { + + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/IdMsgPair.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/IdMsgPair.java new file mode 100644 index 0000000000..2b2c203ec5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/IdMsgPair.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.UUID; + +public class IdMsgPair { + final UUID uuid; + final TbProtoQueueMsg msg; + + public IdMsgPair(UUID uuid, TbProtoQueueMsg msg) { + this.uuid = uuid; + this.msg = msg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..ae5993cb1c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java @@ -0,0 +1,108 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.gen.MsgProtos; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +@Slf4j +public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { + + private volatile BiConsumer> msgConsumer; + private volatile ConcurrentMap msgToEntityIdMap = new ConcurrentHashMap<>(); + private volatile ConcurrentMap> entityIdToListMap = new ConcurrentHashMap<>(); + + public SequentialByEntityIdTbRuleEngineSubmitStrategy(String queueName) { + super(queueName); + } + + @Override + public void init(List> msgs) { + super.init(msgs); + initMaps(); + } + + @Override + public void submitAttempt(BiConsumer> msgConsumer) { + this.msgConsumer = msgConsumer; + entityIdToListMap.forEach((entityId, queue) -> { + IdMsgPair msg = queue.peek(); + if (msg != null) { + msgConsumer.accept(msg.uuid, msg.msg); + } + }); + } + + @Override + public void update(ConcurrentMap> reprocessMap) { + super.update(reprocessMap); + initMaps(); + } + + @Override + protected void doOnSuccess(UUID id) { + EntityId entityId = msgToEntityIdMap.get(id); + if (entityId != null) { + Queue queue = entityIdToListMap.get(entityId); + if (queue != null) { + IdMsgPair next = null; + synchronized (queue) { + IdMsgPair expected = queue.peek(); + if (expected != null && expected.uuid.equals(id)) { + queue.poll(); + next = queue.peek(); + } + } + if (next != null) { + msgConsumer.accept(next.uuid, next.msg); + } + } + } + } + + private void initMaps() { + msgToEntityIdMap.clear(); + entityIdToListMap.clear(); + for (IdMsgPair pair : orderedMsgList) { + EntityId entityId = getEntityId(pair.msg.getValue()); + if (entityId != null) { + msgToEntityIdMap.put(pair.uuid, entityId); + entityIdToListMap.computeIfAbsent(entityId, id -> new LinkedList<>()).add(pair); + } + } + } + + protected abstract EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..cd8a97e82c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.msg.gen.MsgProtos; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; + +@Slf4j +public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends SequentialByEntityIdTbRuleEngineSubmitStrategy { + + public SequentialByOriginatorIdTbRuleEngineSubmitStrategy(String queueName) { + super(queueName); + } + + @Override + protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { + try { + MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + } catch (InvalidProtocolBufferException e) { + log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg); + return null; + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..b258c6db1b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; + +public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialByEntityIdTbRuleEngineSubmitStrategy { + + public SequentialByTenantIdTbRuleEngineSubmitStrategy(String queueName) { + super(queueName); + } + + @Override + protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { + return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); + + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..ef45b983fc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +@Slf4j +public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { + + private final AtomicInteger msgIdx = new AtomicInteger(0); + private volatile BiConsumer> msgConsumer; + private volatile UUID expectedMsgId; + + public SequentialTbRuleEngineSubmitStrategy(String queueName) { + super(queueName); + } + + @Override + public void submitAttempt(BiConsumer> msgConsumer) { + this.msgConsumer = msgConsumer; + msgIdx.set(0); + submitNext(); + } + + @Override + public void update(ConcurrentMap> reprocessMap) { + super.update(reprocessMap); + } + + @Override + protected void doOnSuccess(UUID id) { + if (expectedMsgId.equals(id)) { + msgIdx.incrementAndGet(); + submitNext(); + } + } + + private void submitNext() { + int listSize = orderedMsgList.size(); + int idx = msgIdx.get(); + if (idx < listSize) { + IdMsgPair pair = orderedMsgList.get(idx); + expectedMsgId = pair.uuid; + if (log.isInfoEnabled()) { + log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); + } + msgConsumer.accept(pair.uuid, pair.msg); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java index e67936e294..8e0fcaa74a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.service.queue.ProcessingAttemptContext; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -31,24 +32,27 @@ public class TbRuleEngineProcessingResult { @Getter private final boolean timeout; @Getter - private final ConcurrentMap> pendingMap; - @Getter - private final ConcurrentMap> successMap; - @Getter - private final ConcurrentMap> failureMap; - @Getter - private final ConcurrentMap exceptionsMap; + private final ProcessingAttemptContext ctx; - public TbRuleEngineProcessingResult(boolean timeout, - ConcurrentMap> pendingMap, - ConcurrentMap> successMap, - ConcurrentMap> failureMap, - ConcurrentMap exceptionsMap) { + public TbRuleEngineProcessingResult(boolean timeout, ProcessingAttemptContext ctx) { this.timeout = timeout; - this.pendingMap = pendingMap; - this.successMap = successMap; - this.failureMap = failureMap; - this.exceptionsMap = exceptionsMap; - this.success = !timeout && pendingMap.isEmpty() && failureMap.isEmpty(); + this.ctx = ctx; + this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty(); + } + + public ConcurrentMap> getPendingMap() { + return ctx.getPendingMap(); + } + + public ConcurrentMap> getSuccessMap() { + return ctx.getSuccessMap(); + } + + public ConcurrentMap> getFailedMap() { + return ctx.getFailedMap(); + } + + public ConcurrentMap getExceptionsMap() { + return ctx.getExceptionsMap(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index 1096bfbf7e..b6579b8dcb 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -16,12 +16,10 @@ package org.thingsboard.server.service.queue.processing; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.settings.TbRuleEngineQueueAckStrategyConfiguration; -import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -77,10 +75,10 @@ public class TbRuleEngineProcessingStrategyFactory { return new TbRuleEngineProcessingDecision(true, null); } else { if (retryCount == 0) { - initialTotalCount = result.getPendingMap().size() + result.getFailureMap().size() + result.getSuccessMap().size(); + initialTotalCount = result.getPendingMap().size() + result.getFailedMap().size() + result.getSuccessMap().size(); } retryCount++; - double failedCount = result.getFailureMap().size() + result.getPendingMap().size(); + double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); if (maxRetries > 0 && retryCount > maxRetries) { log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); return new TbRuleEngineProcessingDecision(true, null); @@ -90,7 +88,7 @@ public class TbRuleEngineProcessingStrategyFactory { } else { ConcurrentMap> toReprocess = new ConcurrentHashMap<>(initialTotalCount); if (retryFailed) { - result.getFailureMap().forEach(toReprocess::put); + result.getFailedMap().forEach(toReprocess::put); } if (retryTimeout) { result.getPendingMap().forEach(toReprocess::put); @@ -125,7 +123,7 @@ public class TbRuleEngineProcessingStrategyFactory { @Override public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailureMap().size(), result.getPendingMap().size()); + log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); return new TbRuleEngineProcessingDecision(true, null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategy.java new file mode 100644 index 0000000000..7b22da97db --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategy.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; + +public interface TbRuleEngineSubmitStrategy { + + void init(List> msgs); + + ConcurrentMap> getPendingMap(); + + void submitAttempt(BiConsumer> msgConsumer); + + void update(ConcurrentMap> reprocessMap); + + void onSuccess(UUID id); + + void stop(); +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategyFactory.java new file mode 100644 index 0000000000..f5a7457c17 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineSubmitStrategyFactory.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue.processing; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueSubmitStrategyConfiguration; + +@Component +@Slf4j +public class TbRuleEngineSubmitStrategyFactory { + + public TbRuleEngineSubmitStrategy newInstance(String name, TbRuleEngineQueueSubmitStrategyConfiguration configuration) { + switch (configuration.getType()) { + case "BURST": + return new BurstTbRuleEngineSubmitStrategy(name); + case "BATCH": + return new BatchTbRuleEngineSubmitStrategy(name, configuration.getBatchSize()); + case "SEQUENTIAL_WITHIN_ORIGINATOR": + return new SequentialByOriginatorIdTbRuleEngineSubmitStrategy(name); + case "SEQUENTIAL_WITHIN_TENANT": + return new SequentialByTenantIdTbRuleEngineSubmitStrategy(name); + case "SEQUENTIAL": + return new SequentialTbRuleEngineSubmitStrategy(name); + default: + throw new RuntimeException("TbRuleEngineProcessingStrategy with type " + configuration.getType() + " is not supported!"); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index ce5065a64d..6d70056af0 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -123,7 +123,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToDeviceRpcRequests.put(requestId, rpcMsg); - actorContext.getAppActor().tell(rpcMsg, ActorRef.noSender()); + actorContext.tell(rpcMsg, ActorRef.noSender()); scheduleToDeviceTimeout(request, requestId); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f08d2baf46..4c4955fea6 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -578,27 +578,35 @@ queue: print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}" queues: # TODO 2.5: specify correct ENV variable names. - name: "Main" - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.main}" - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" - ack-strategy: - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}" + poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" + partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" + pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}" + submit-strategy: + type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL + # For BATCH only + batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch + processing-strategy: + type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; - - name: "${TB_QUEUE_RULE_ENGINE_HP_QUEUE_NAME:HighPriority}" - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}" - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}" - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" - ack-strategy: - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited + failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; + pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; + - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" + topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}" + poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" + partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}" + pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}" + submit-strategy: + type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL + # For BATCH only + batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch + processing-strategy: + type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:1}"# Time in seconds to wait in consumer thread before retries; + retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited + failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; + pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java index c1a8fd883d..0d21c59c9c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java @@ -20,13 +20,9 @@ import lombok.Data; @Data public class TbRuleEngineQueueAckStrategyConfiguration { -// @Value("${type}") private String type; -// @Value("${retries:3}") private int retries; -// @Value("${failure_percentage:0}") private double failurePercentage; -// @Value("${pause_between_retries:3}") private long pauseBetweenRetries; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java index f89a615d7d..c5a24f20c6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java @@ -25,6 +25,7 @@ public class TbRuleEngineQueueConfiguration { private int pollInterval; private int partitions; private String packProcessingTimeout; - private TbRuleEngineQueueAckStrategyConfiguration ackStrategy; + private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy; + private TbRuleEngineQueueAckStrategyConfiguration processingStrategy; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueSubmitStrategyConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueSubmitStrategyConfiguration.java new file mode 100644 index 0000000000..e39dc0c322 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueSubmitStrategyConfiguration.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.settings; + +import lombok.Data; + +@Data +public class TbRuleEngineQueueSubmitStrategyConfiguration { + + private String type; + private int batchSize; + +}