diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java index 39b502a59d..fa00856b4b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.util; import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.rule.engine.api.TbContext; @@ -40,7 +39,6 @@ import java.util.function.BiFunction; */ @Data @Slf4j -@RequiredArgsConstructor public class SemaphoreWithTbMsgQueue { private final EntityId entityId; @@ -85,15 +83,15 @@ public class SemaphoreWithTbMsgQueue { semaphore.release(); continue; } - final TbMsg msg = tbMsgTbContext.getMsg(); + final TbMsg msg = tbMsgTbContext.msg(); if (!msg.getCallback().isMsgValid()) { log.trace("[{}] Skipping non-valid message [{}]", entityId, msg); semaphore.release(); continue; } //DO PROCESSING - final TbContext ctx = tbMsgTbContext.getCtx(); - final ListenableFuture resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg); + final TbContext ctx = tbMsgTbContext.ctx(); + final ListenableFuture resultMsgFuture = tbMsgTbContext.biFunction().apply(ctx, msg); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -115,8 +113,8 @@ public class SemaphoreWithTbMsgQueue { log.error("[{}] Failed to process TbMsgTbContext queue", entityId, t); throw t; } - TbMsg msg = tbMsgTbContext.getMsg(); - TbContext ctx = tbMsgTbContext.getCtx(); + TbMsg msg = tbMsgTbContext.msg(); + TbContext ctx = tbMsgTbContext.ctx(); log.warn("[{}] Failed to process message: {}", entityId, msg, t); ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty @@ -126,15 +124,11 @@ public class SemaphoreWithTbMsgQueue { } /** - * A utility class to hold the tuple of a {@link TbMsg}, {@link TbContext}, and the message processing function. + * A utility record to hold the tuple of a {@link TbMsg}, {@link TbContext}, and the message processing function. * This facilitates passing these three elements as a single object within the queue. */ - @Data - @RequiredArgsConstructor - private static class TbMsgTbContextBiFunction { - private final TbMsg msg; - private final TbContext ctx; - private final BiFunction> biFunction; + private record TbMsgTbContextBiFunction(TbMsg msg, TbContext ctx, + BiFunction> biFunction) { } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java index 93e3957c03..0606030f30 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java @@ -82,7 +82,7 @@ import static org.mockito.Mockito.when; public class CalculateDeltaNodeTest { private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9")); - private final TenantId TENANT_ID = new TenantId(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba")); private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); private static final int RULE_DISPATCHER_POOL_SIZE = 2;