From 07bdcac0fe2b8b5165cd7823fa53deaba26ea45a Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 24 Mar 2020 19:18:27 +0200 Subject: [PATCH] Refactoring of Rule Engine API --- .../server/actors/ActorSystemContext.java | 19 ++- .../actors/ruleChain/DefaultTbContext.java | 16 ++- .../RuleChainActorMessageProcessor.java | 10 +- .../server/controller/BaseController.java | 19 ++- .../queue/DefaultTbCoreConsumerService.java | 16 ++- .../DefaultTbRuleEngineConsumerService.java | 67 ++++++---- .../server/service/queue/MsgPackCallback.java | 28 +++-- .../TbRuleEngineProcessingDecision.java | 16 +++ .../TbRuleEngineProcessingResult.java | 33 +++++ .../TbRuleEngineProcessingStrategy.java | 7 ++ ...TbRuleEngineProcessingStrategyFactory.java | 118 ++++++++++++++++++ .../state/DefaultDeviceStateService.java | 10 +- .../DefaultSubscriptionManagerService.java | 2 +- .../SubscriptionManagerService.java | 3 +- .../DefaultTelemetrySubscriptionService.java | 2 +- .../src/main/resources/thingsboard.yml | 74 ++++++----- .../thingsboard/server/common/msg/TbMsg.java | 6 +- .../service/DefaultTransportService.java | 8 +- 18 files changed, 363 insertions(+), 91 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingDecision.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategy.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index e5158377cd..64a713b336 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -63,8 +63,13 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.ServiceType; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicPartitionInfo; import org.thingsboard.server.queue.provider.TbRuleEngineQueueProvider; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; @@ -154,7 +159,6 @@ public class ActorSystemContext { private RuleChainService ruleChainService; @Autowired - @Getter private PartitionService partitionService; @Autowired @@ -402,6 +406,17 @@ public class ActorSystemContext { return mapper.createObjectNode().put("server", serviceId).put("method", method).put("error", body); } + public TbQueueProducer> getTbCoreMsgProducer() { + return ruleEngineQueueProvider.getTbCoreMsgProducer(); + } + + public TbQueueProducer> getRuleEngineMsgProducer() { + return ruleEngineQueueProvider.getRuleEngineMsgProducer(); + } + + public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) { + return partitionService.resolve(serviceType, tenantId, entityId); + } public String getServerAddress() { return serviceInfoProvider.getServiceId(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a5b5254174..c9273e0a11 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -65,7 +65,10 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.ServiceType; +import org.thingsboard.server.queue.discovery.TopicPartitionInfo; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import scala.concurrent.duration.Duration; @@ -119,7 +122,7 @@ class DefaultTbContext implements TbContext { @Override public boolean isLocalEntity(EntityId entityId) { - return mainCtx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition(); + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition(); } private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { @@ -151,8 +154,13 @@ class DefaultTbContext implements TbContext { } @Override - public void sendTbMsgToRuleEngine(TbMsg msg) { - mainCtx.getActorService().onMsg(new SendToClusterMsg(msg.getOriginator(), new QueueToRuleEngineMsg(getTenantId(), msg))); + public void sendTbMsgToRuleEngine(TbMsg tbMsg) { + TenantId tenantId = getTenantId(); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator()); + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); + mainCtx.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null); } public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index b3c71b3b62..13d370e037 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -68,7 +68,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor nodeActors; private final Map> nodeRoutes; private final RuleChainService service; - private final PartitionService partitionService; private final TbQueueProducer> producer; private RuleNodeId firstId; @@ -83,7 +82,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(); this.nodeRoutes = new HashMap<>(); this.service = systemContext.getRuleChainService(); - this.partitionService = systemContext.getPartitionService(); this.producer = systemContext.getRuleEngineQueueProvider().getRuleEngineMsgProducer(); } @@ -234,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relations = nodeRoutes.get(originatorNodeId).stream() .filter(r -> contains(envelope.getRelationTypes(), r.getType())) @@ -242,9 +240,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(newMsg.getId(), toQueueMsg), callbackWrapper); } diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index e09b8ca673..bc88544243 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -93,6 +93,12 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.ServiceType; +import org.thingsboard.server.queue.discovery.TopicPartitionInfo; +import org.thingsboard.server.queue.provider.TbCoreQueueProvider; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.AccessControlService; @@ -185,6 +191,12 @@ public abstract class BaseController { @Autowired protected ClaimDevicesService claimDevicesService; + @Autowired + protected PartitionService partitionService; + + @Autowired + protected TbCoreQueueProvider coreQueueProvider; + @Value("${server.log_controller_error_stack_trace}") @Getter private boolean logControllerErrorStackTrace; @@ -662,7 +674,12 @@ public abstract class BaseController { TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, entityId, metaData, TbMsgDataType.JSON , json.writeValueAsString(entityNode) , null, null, null); - actorService.onMsg(new SendToClusterMsg(entityId, new QueueToRuleEngineMsg(user.getTenantId(), tbMsg))); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, user.getTenantId(), entityId); + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(user.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(user.getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + coreQueueProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null); } catch (Exception e) { log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index c081fc526a..30148dfe08 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -45,6 +45,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -104,11 +105,13 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { if (msgs.isEmpty()) { continue; } - ConcurrentMap> ackMap = msgs.stream().collect( + ConcurrentMap> pendingMap = msgs.stream().collect( Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + ConcurrentMap> successMap = new ConcurrentHashMap<>(); + ConcurrentMap> failedMap = new ConcurrentHashMap<>(); CountDownLatch processingTimeoutLatch = new CountDownLatch(1); - ackMap.forEach((id, msg) -> { - TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); + pendingMap.forEach((id, msg) -> { + TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, pendingMap, successMap, failedMap); try { ToCoreMsg toCoreMsg = msg.getValue(); if (toCoreMsg.hasToDeviceActorMsg()) { @@ -130,7 +133,8 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { } }); if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { - ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); + pendingMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); + failedMap.forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } consumer.commit(); } catch (Exception e) { @@ -182,7 +186,7 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback); } else if (msg.hasTsUpdate()) { TransportProtos.TbTimeSeriesUpdateProto proto = msg.getTsUpdate(); - subscriptionManagerService.onTimeseriesDataUpdate( + subscriptionManagerService.onTimeSeriesUpdate( new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), TbSubscriptionUtils.toTsKvEntityList(proto.getDataList()), callback); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 1f84c7cc98..986d858043 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -36,11 +36,16 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.provider.TbRuleEngineQueueProvider; +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision; +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy; +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -64,10 +69,12 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS private final ActorSystemContext actorContext; private final TbQueueConsumer> consumer; private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); + private final TbRuleEngineProcessingStrategyFactory factory; private volatile ExecutorService mainConsumerExecutor; private volatile boolean stopped = false; - public DefaultTbRuleEngineConsumerService(TbRuleEngineQueueProvider tbRuleEngineQueueProvider, ActorSystemContext actorContext) { + public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory factory, TbRuleEngineQueueProvider tbRuleEngineQueueProvider, ActorSystemContext actorContext) { + this.factory = factory; this.consumer = tbRuleEngineQueueProvider.getToRuleEngineMsgConsumer(); this.actorContext = actorContext; } @@ -75,6 +82,7 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS @PostConstruct public void init() { this.mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-consumer")); + this.factory.newInstance(); } @Override @@ -94,29 +102,46 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS if (msgs.isEmpty()) { continue; } - ConcurrentMap> ackMap = msgs.stream().collect( - Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); - CountDownLatch processingTimeoutLatch = new CountDownLatch(1); - ackMap.forEach((id, msg) -> { - TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); - try { - TransportProtos.ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); - TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); - if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { - forwardToRuleEngineActor(tenantId, toRuleEngineMsg.getTbMsg(), callback); - } else { - callback.onSuccess(); - } - } catch (Throwable e) { - callback.onFailure(e); + TbRuleEngineProcessingStrategy strategy = factory.newInstance(); + TbRuleEngineProcessingDecision decision = null; + boolean firstAttempt = true; + while (!stopped && (firstAttempt || !decision.isCommit())) { + ConcurrentMap> allMap; + if (firstAttempt) { + allMap = msgs.stream().collect( + Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + firstAttempt = false; + } else { + allMap = decision.getReprocessMap(); } - }); - if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { - ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); + ConcurrentMap> successMap = new ConcurrentHashMap<>(); + ConcurrentMap> failedMap = new ConcurrentHashMap<>(); + + CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + allMap.forEach((id, msg) -> { + TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, allMap, successMap, failedMap); + try { + TransportProtos.ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); + TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); + if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { + forwardToRuleEngineActor(tenantId, toRuleEngineMsg.getTbMsg(), callback); + } else { + callback.onSuccess(); + } + } catch (Throwable e) { + callback.onFailure(e); + } + }); + + boolean timeout = false; + if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + timeout = true; + } + decision = strategy.analyze(new TbRuleEngineProcessingResult(timeout, allMap, successMap, failedMap)); } consumer.commit(); } catch (Exception e) { - log.warn("Failed to obtain messages from queue.", e); + log.warn("Failed to process messages from queue.", e); try { Thread.sleep(pollDuration); } catch (InterruptedException e2) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java index d616bbef16..420c61d5d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,25 +20,37 @@ import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @Slf4j -public class MsgPackCallback implements TbMsgCallback { +public class MsgPackCallback implements TbMsgCallback { private final CountDownLatch processingTimeoutLatch; - private final ConcurrentMap> ackMap; + private final ConcurrentMap ackMap; + private final ConcurrentMap successMap; + private final ConcurrentMap failedMap; private final UUID id; - public MsgPackCallback(UUID id, CountDownLatch processingTimeoutLatch, ConcurrentMap> ackMap) { + public MsgPackCallback(UUID id, CountDownLatch processingTimeoutLatch, + ConcurrentMap ackMap, + ConcurrentMap successMap, + ConcurrentMap failedMap) { this.id = id; this.processingTimeoutLatch = processingTimeoutLatch; this.ackMap = ackMap; + this.successMap = successMap; + this.failedMap = failedMap; } @Override public void onSuccess() { log.trace("[{}] ON SUCCESS", id); - if (ackMap.remove(id) != null && ackMap.isEmpty()) { + T msg = ackMap.remove(id); + if (msg != null) { + successMap.put(id, msg); + } + if (msg != null && ackMap.isEmpty()) { processingTimeoutLatch.countDown(); } } @@ -46,8 +58,10 @@ public class MsgPackCallback i @Override public void onFailure(Throwable t) { log.trace("[{}] ON FAILURE", id); - TbProtoQueueMsg message = ackMap.remove(id); - log.warn("Failed to process message: {}", message.getValue(), t); + T msg = ackMap.remove(id); + if (msg != null) { + failedMap.put(id, msg); + } if (ackMap.isEmpty()) { processingTimeoutLatch.countDown(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingDecision.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingDecision.java new file mode 100644 index 0000000000..b116fd7443 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingDecision.java @@ -0,0 +1,16 @@ +package org.thingsboard.server.service.queue.processing; + +import lombok.Data; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +@Data +public class TbRuleEngineProcessingDecision { + + private final boolean commit; + private final ConcurrentMap> reprocessMap; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java new file mode 100644 index 0000000000..601a3e9a9b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java @@ -0,0 +1,33 @@ +package org.thingsboard.server.service.queue.processing; + +import lombok.Getter; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +public class TbRuleEngineProcessingResult { + + @Getter + private boolean success; + @Getter + private boolean timeout; + @Getter + private ConcurrentMap> pendingMap; + @Getter + private ConcurrentMap> successMap; + @Getter + private ConcurrentMap> failureMap; + + public TbRuleEngineProcessingResult(boolean timeout, + ConcurrentMap> pendingMap, + ConcurrentMap> successMap, + ConcurrentMap> failureMap) { + this.timeout = timeout; + this.pendingMap = pendingMap; + this.successMap = successMap; + this.failureMap = failureMap; + this.success = !timeout && pendingMap.isEmpty() && failureMap.isEmpty(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategy.java new file mode 100644 index 0000000000..c644766892 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategy.java @@ -0,0 +1,7 @@ +package org.thingsboard.server.service.queue.processing; + +public interface TbRuleEngineProcessingStrategy { + + TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java new file mode 100644 index 0000000000..336baf5360 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -0,0 +1,118 @@ +package org.thingsboard.server.service.queue.processing; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class TbRuleEngineProcessingStrategyFactory { + + @Value("${queue.rule_engine.strategy.type}") + private String strategyType; + @Value("${queue.rule_engine.strategy.retries:3}") + private int maxRetries; + @Value("${queue.rule_engine.strategy.failure_percentage:0}") + private double maxAllowedFailurePercentage; + @Value("${queue.rule_engine.strategy.pause_between_retries:3}") + private long pauseBetweenRetries; + + + public TbRuleEngineProcessingStrategy newInstance() { + switch (strategyType) { + case "SKIP_ALL": + return new SkipStrategy(); + case "RETRY_ALL": + return new RetryStrategy(true, true, true, maxRetries, maxAllowedFailurePercentage, pauseBetweenRetries); + case "RETRY_FAILED": + return new RetryStrategy(false, true, false, maxRetries, maxAllowedFailurePercentage, pauseBetweenRetries); + case "RETRY_TIMED_OUT": + return new RetryStrategy(false, false, true, maxRetries, maxAllowedFailurePercentage, pauseBetweenRetries); + case "RETRY_FAILED_AND_TIMED_OUT": + return new RetryStrategy(false, true, true, maxRetries, maxAllowedFailurePercentage, pauseBetweenRetries); + default: + throw new RuntimeException("TbRuleEngineProcessingStrategy with type " + strategyType + " is not supported!"); + } + } + + private static class RetryStrategy implements TbRuleEngineProcessingStrategy { + private final boolean retrySuccessful; + private final boolean retryFailed; + private final boolean retryTimeout; + private final int maxRetries; + private final double maxAllowedFailurePercentage; + private final long pauseBetweenRetries; + + private int initialTotalCount; + private int retryCount; + + public RetryStrategy(boolean retrySuccessful, boolean retryFailed, boolean retryTimeout, int maxRetries, double maxAllowedFailurePercentage, long pauseBetweenRetries) { + this.retrySuccessful = retrySuccessful; + this.retryFailed = retryFailed; + this.retryTimeout = retryTimeout; + this.maxRetries = maxRetries; + this.maxAllowedFailurePercentage = maxAllowedFailurePercentage; + this.pauseBetweenRetries = pauseBetweenRetries; + } + + @Override + public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { + if (result.isSuccess()) { + return new TbRuleEngineProcessingDecision(true, null); + } else { + if (retryCount == 0) { + initialTotalCount = result.getPendingMap().size() + result.getFailureMap().size() + result.getSuccessMap().size(); + } + retryCount++; + double failedCount = result.getFailureMap().size() + result.getPendingMap().size(); + if (maxRetries > 0 && retryCount > maxRetries) { + log.info("Skip reprocess of the rule engine pack due to max retries"); + return new TbRuleEngineProcessingDecision(true, null); + } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { + log.info("Skip reprocess of the rule engine pack due to max allowed failure percentage"); + return new TbRuleEngineProcessingDecision(true, null); + } else { + ConcurrentMap> toReprocess = new ConcurrentHashMap<>(initialTotalCount); + if (retryFailed) { + result.getFailureMap().forEach(toReprocess::put); + } + if (retryTimeout) { + result.getPendingMap().forEach(toReprocess::put); + } + if (retrySuccessful) { + result.getSuccessMap().forEach(toReprocess::put); + } + log.info("Going to reprocess {} messages", toReprocess.size()); + //TODO: 2.5 Log most popular rule nodes by error count; + if (log.isTraceEnabled()) { + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, msg.getValue())); + } + if (pauseBetweenRetries > 0) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return new TbRuleEngineProcessingDecision(false, toReprocess); + } + } + } + } + + private static class SkipStrategy implements TbRuleEngineProcessingStrategy { + + @Override + public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { + log.info("Skip reprocess of the rule engine pack"); + return new TbRuleEngineProcessingDecision(true, null); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 816479b1e2..84fa1a7ac2 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,6 +32,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -502,7 +503,12 @@ public class DefaultDeviceStateService implements DeviceStateService { TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON , json.writeValueAsString(state) , null, null, null); - actorService.onMsg(new SendToClusterMsg(stateData.getDeviceId(), new QueueToRuleEngineMsg(stateData.getTenantId(), tbMsg))); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, stateData.getTenantId(), stateData.getDeviceId()); + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(stateData.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(stateData.getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + queueProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null); } catch (Exception e) { log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index be186e0ddd..adff35fa51 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -187,7 +187,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer } @Override - public void onTimeseriesDataUpdate(TenantId tenantId, EntityId entityId, List ts, TbMsgCallback callback) { + public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbMsgCallback callback) { onLocalSubUpdate(entityId, s -> { if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index d4d9fa5841..aa1824c855 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -31,7 +31,8 @@ public interface SubscriptionManagerService extends ApplicationListener ts, TbMsgCallback callback); + void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbMsgCallback callback); void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbMsgCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 58ed13c1e6..ab2803345a 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -168,7 +168,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio private void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { - subscriptionManagerService.onTimeseriesDataUpdate(tenantId, entityId, ts, TbMsgCallback.EMPTY); + subscriptionManagerService.onTimeSeriesUpdate(tenantId, entityId, ts, TbMsgCallback.EMPTY); } else { TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesUpdateProto(tenantId, entityId, ts); toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 8a4d735d26..c73e623d56 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -181,27 +181,27 @@ cassandra: # SQL configuration parameters sql: - # Specify batch size for persisting attribute updates - attributes: - batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" - ts: - batch_size: "${SQL_TS_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" - ts_latest: - batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" - # Specify whether to remove null characters from strValue of attributes and timeseries before insert - remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" - postgres: - # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. - ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" - timescale: - # Specify Interval size for new data chunks storage. - chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" + # Specify batch size for persisting attribute updates + attributes: + batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" + ts: + batch_size: "${SQL_TS_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" + ts_latest: + batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" + # Specify whether to remove null characters from strValue of attributes and timeseries before insert + remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" + postgres: + # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. + ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" + timescale: + # Specify Interval size for new data chunks storage. + chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" # Actor system parameters actors: @@ -330,19 +330,19 @@ updates: # spring CORS configuration spring.mvc.cors: - mappings: - # Intercept path - "[/api/**]": - #Comma-separated list of origins to allow. '*' allows all origins. When not set,CORS support is disabled. - allowed-origins: "*" - #Comma-separated list of methods to allow. '*' allows all methods. - allowed-methods: "*" - #Comma-separated list of headers to allow in a request. '*' allows all headers. - allowed-headers: "*" - #How long, in seconds, the response from a pre-flight request can be cached by clients. - max-age: "1800" - #Set whether credentials are supported. When not set, credentials are not supported. - allow-credentials: "true" + mappings: + # Intercept path + "[/api/**]": + #Comma-separated list of origins to allow. '*' allows all origins. When not set,CORS support is disabled. + allowed-origins: "*" + #Comma-separated list of methods to allow. '*' allows all methods. + allowed-methods: "*" + #Comma-separated list of headers to allow in a request. '*' allows all headers. + allowed-headers: "*" + #How long, in seconds, the response from a pre-flight request can be cached by clients. + max-age: "1800" + #Set whether credentials are supported. When not set, credentials are not supported. + allow-credentials: "true" # spring serve gzip compressed static resources spring.resources.chain: @@ -551,6 +551,12 @@ queue: poll_interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}" pack_processing_timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" + strategy: + type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited + failure_percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; + pause_between_retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; stats: enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:false}" print_interval_ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}" diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 9b43169737..062eb02749 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.msg; +import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import lombok.AllArgsConstructor; import lombok.Builder; @@ -73,7 +74,10 @@ public final class TbMsg implements Serializable { log.warn("[{}] Created message with empty callback: {}", originator, type); this.callback = TbMsgCallback.EMPTY; } + } + public static ByteString toByteString(TbMsg msg) { + return ByteString.copyFrom(toByteArray(msg)); } public static byte[] toByteArray(TbMsg msg) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 1a80aa4a6e..42d3a534a9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -244,7 +244,7 @@ public class DefaultTransportService implements TransportService { metaData.putValue("ts", tsKv.getTs() + ""); JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); TbMsg tbMsg = new TbMsg(UUID.randomUUID(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), - deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, null); + deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, TbMsgCallback.EMPTY); sendToRuleEngine(tenantId, tbMsg, packCallback); } } @@ -261,7 +261,7 @@ public class DefaultTransportService implements TransportService { metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); TbMsg tbMsg = new TbMsg(UUID.randomUUID(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, - TbMsgDataType.JSON, gson.toJson(json), null, null, null); + TbMsgDataType.JSON, gson.toJson(json), null, null, TbMsgCallback.EMPTY); sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); } } @@ -488,7 +488,7 @@ public class DefaultTransportService implements TransportService { protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator()); - ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(ByteString.copyFrom(TbMsg.toByteArray(tbMsg))) + ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);