From bf9c4af2aa738655dcbfc5dabde934184b38cbc2 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 1 Feb 2024 20:23:12 +0100 Subject: [PATCH 1/2] fixed infinit tell failure --- .../AbstractTbRuleEngineSubmitStrategy.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java index 3801e43b7f..6f9a4c49c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue.processing; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -51,7 +52,18 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine List> newOrderedMsgList = new ArrayList<>(reprocessMap.size()); for (IdMsgPair pair : orderedMsgList) { if (reprocessMap.containsKey(pair.uuid)) { - newOrderedMsgList.add(pair); + var oldValue = pair.getMsg().getValue(); + if (StringUtils.isNotEmpty(oldValue.getFailureMessage())) { + var newValue = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(oldValue.getTenantIdMSB()) + .setTenantIdLSB(oldValue.getTenantIdLSB()) + .setTbMsg(oldValue.getTbMsg()) + .build(); + var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), newValue, pair.getMsg().getHeaders()); + newOrderedMsgList.add(new IdMsgPair<>(pair.getUuid(), newMsg)); + } else { + newOrderedMsgList.add(pair); + } } } orderedMsgList = newOrderedMsgList; From 100cdd5cf18f12859940db9284c38f17829a0473 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 2 Feb 2024 14:00:30 +0100 Subject: [PATCH 2/2] refactored due to comments --- .../AbstractTbRuleEngineSubmitStrategy.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java index 6f9a4c49c6..bfec9f747f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java @@ -52,14 +52,12 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine List> newOrderedMsgList = new ArrayList<>(reprocessMap.size()); for (IdMsgPair pair : orderedMsgList) { if (reprocessMap.containsKey(pair.uuid)) { - var oldValue = pair.getMsg().getValue(); - if (StringUtils.isNotEmpty(oldValue.getFailureMessage())) { - var newValue = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(oldValue.getTenantIdMSB()) - .setTenantIdLSB(oldValue.getTenantIdLSB()) - .setTbMsg(oldValue.getTbMsg()) + if (StringUtils.isNotEmpty(pair.getMsg().getValue().getFailureMessage())) { + var toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder(pair.getMsg().getValue()) + .clearFailureMessage() + .clearRelationTypes() .build(); - var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), newValue, pair.getMsg().getHeaders()); + var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), toRuleEngineMsg, pair.getMsg().getHeaders()); newOrderedMsgList.add(new IdMsgPair<>(pair.getUuid(), newMsg)); } else { newOrderedMsgList.add(pair);