diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index e8816ff607..beae54dce6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -43,7 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @RuleNode( - type = ComponentType.ACTION, + type = ComponentType.TRANSFORMATION, name = "deduplication", configClazz = TbMsgDeduplicationNodeConfiguration.class, nodeDescription = "Deduplicate messages for a configurable period based on a specified deduplication strategy.", @@ -153,7 +153,15 @@ public class TbMsgDeduplicationNode implements TbNode { } } } - deduplicationResults.add(resultMsg); + if (resultMsg != null) { + deduplicationResults.add(TbMsg.newMsg( + resultMsg.getQueueName(), + resultMsg.getType(), + resultMsg.getOriginator(), + resultMsg.getCustomerId(), + resultMsg.getMetaData(), + resultMsg.getData())); + } } packBoundsOpt = findValidPack(msgList, deduplicationTimeoutMs); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 857a068e8c..1f18aea86e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -66,7 +66,17 @@ public class TbMsgDelayNode implements TbNode { if (msg.getType().equals(TB_MSG_DELAY_NODE_MSG)) { TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData())); if (pendingMsg != null) { - ctx.enqueueForTellNext(pendingMsg, SUCCESS); + ctx.enqueueForTellNext( + TbMsg.newMsg( + pendingMsg.getQueueName(), + pendingMsg.getType(), + pendingMsg.getOriginator(), + pendingMsg.getCustomerId(), + pendingMsg.getMetaData(), + pendingMsg.getData() + ), + SUCCESS + ); } } else { if (pendingMsgs.size() < config.getMaxPendingMsgs()) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java deleted file mode 100644 index 1091a7d29c..0000000000 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java +++ /dev/null @@ -1,402 +0,0 @@ -/** - * Copyright © 2016-2023 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.action; - -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; -import org.mockito.stubbing.Answer; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeConfiguration; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.TbRelationTypes; -import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy; -import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode; -import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.common.msg.session.SessionMsgType; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@Slf4j -public class TbMsgDeduplicationNodeTest { - - private static final String MAIN_QUEUE_NAME = "Main"; - private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority"; - private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg"; - - private TbContext ctx; - - private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test"); - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory); - private final int deduplicationInterval = 1; - - private TenantId tenantId; - - private TbMsgDeduplicationNode node; - private TbMsgDeduplicationNodeConfiguration config; - private TbNodeConfiguration nodeConfiguration; - - private CountDownLatch awaitTellSelfLatch; - - @BeforeEach - public void init() throws TbNodeException { - ctx = mock(TbContext.class); - - tenantId = TenantId.fromUUID(UUID.randomUUID()); - RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID()); - - when(ctx.getSelfId()).thenReturn(ruleNodeId); - when(ctx.getTenantId()).thenReturn(tenantId); - - doAnswer((Answer) invocationOnMock -> { - String type = (String) (invocationOnMock.getArguments())[1]; - EntityId originator = (EntityId) (invocationOnMock.getArguments())[2]; - TbMsgMetaData metaData = (TbMsgMetaData) (invocationOnMock.getArguments())[3]; - String data = (String) (invocationOnMock.getArguments())[4]; - return TbMsg.newMsg(type, originator, metaData.copy(), data); - }).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class)); - node = spy(new TbMsgDeduplicationNode()); - config = new TbMsgDeduplicationNodeConfiguration().defaultConfiguration(); - } - - private void invokeTellSelf(int maxNumberOfInvocation) { - invokeTellSelf(maxNumberOfInvocation, false, 0); - } - - private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) { - AtomicLong scheduleTimeout = new AtomicLong(deduplicationInterval); - AtomicInteger scheduleCount = new AtomicInteger(0); - doAnswer((Answer) invocationOnMock -> { - scheduleCount.getAndIncrement(); - if (scheduleCount.get() <= maxNumberOfInvocation) { - TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0]; - executorService.schedule(() -> { - try { - node.onMsg(ctx, msg); - awaitTellSelfLatch.countDown(); - } catch (ExecutionException | InterruptedException | TbNodeException e) { - log.error("Failed to execute tellSelf method call due to: ", e); - } - }, scheduleTimeout.get(), TimeUnit.SECONDS); - if (delayScheduleTimeout) { - scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier); - } - } - - return null; - }).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong()); - } - - @AfterEach - public void destroy() { - executorService.shutdown(); - node.destroy(); - } - - @Test - public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { - int wantedNumberOfTellSelfInvocation = 2; - int msgCount = 100; - awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); - invokeTellSelf(wantedNumberOfTellSelfInvocation); - - config.setInterval(deduplicationInterval); - config.setMaxPendingMsgs(msgCount); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - long currentTimeMillis = System.currentTimeMillis(); - - List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); - for (TbMsg msg : inputMsgs) { - node.onMsg(ctx, msg); - } - - TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); - node.onMsg(ctx, msgToReject); - - awaitTellSelfLatch.await(); - - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(ctx, times(msgCount)).ack(any()); - verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); - verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); - verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); - } - - @Test - public void given_100_messages_strategy_last_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { - int wantedNumberOfTellSelfInvocation = 2; - int msgCount = 100; - awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); - invokeTellSelf(wantedNumberOfTellSelfInvocation); - - config.setStrategy(DeduplicationStrategy.LAST); - config.setInterval(deduplicationInterval); - config.setMaxPendingMsgs(msgCount); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - long currentTimeMillis = System.currentTimeMillis(); - - List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); - TbMsg msgWithLatestTs = getMsgWithLatestTs(inputMsgs); - - for (TbMsg msg : inputMsgs) { - node.onMsg(ctx, msg); - } - - TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); - node.onMsg(ctx, msgToReject); - - awaitTellSelfLatch.await(); - - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(ctx, times(msgCount)).ack(any()); - verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); - verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); - verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue()); - } - - @Test - public void given_100_messages_strategy_all_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { - int wantedNumberOfTellSelfInvocation = 2; - int msgCount = 100; - awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); - invokeTellSelf(wantedNumberOfTellSelfInvocation); - - config.setInterval(deduplicationInterval); - config.setStrategy(DeduplicationStrategy.ALL); - config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - long currentTimeMillis = System.currentTimeMillis(); - - List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); - for (TbMsg msg : inputMsgs) { - node.onMsg(ctx, msg); - } - - awaitTellSelfLatch.await(); - - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(ctx, times(msgCount)).ack(any()); - verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); - verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - - Assertions.assertEquals(1, newMsgCaptor.getAllValues().size()); - TbMsg outMessage = newMsgCaptor.getAllValues().get(0); - Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); - Assertions.assertEquals(deviceId, outMessage.getOriginator()); - Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); - Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); - } - - @Test - public void given_100_messages_strategy_all_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { - int wantedNumberOfTellSelfInvocation = 2; - int msgCount = 100; - awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); - invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); - - config.setInterval(deduplicationInterval); - config.setStrategy(DeduplicationStrategy.ALL); - config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - long currentTimeMillis = System.currentTimeMillis(); - - List firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); - for (TbMsg msg : firstMsgPack) { - node.onMsg(ctx, msg); - } - long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); - - List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); - for (TbMsg msg : secondMsgPack) { - node.onMsg(ctx, msg); - } - - awaitTellSelfLatch.await(); - - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(ctx, times(msgCount)).ack(any()); - verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); - verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - - List resultMsgs = newMsgCaptor.getAllValues(); - Assertions.assertEquals(2, resultMsgs.size()); - - TbMsg firstMsg = resultMsgs.get(0); - Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData()); - Assertions.assertEquals(deviceId, firstMsg.getOriginator()); - Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType()); - Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName()); - - TbMsg secondMsg = resultMsgs.get(1); - Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData()); - Assertions.assertEquals(deviceId, secondMsg.getOriginator()); - Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType()); - Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName()); - } - - @Test - public void given_100_messages_strategy_last_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { - int wantedNumberOfTellSelfInvocation = 2; - int msgCount = 100; - awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); - invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); - - config.setInterval(deduplicationInterval); - config.setStrategy(DeduplicationStrategy.LAST); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - long currentTimeMillis = System.currentTimeMillis(); - - List firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); - for (TbMsg msg : firstMsgPack) { - node.onMsg(ctx, msg); - } - long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); - TbMsg msgWithLatestTsInFirstPack = getMsgWithLatestTs(firstMsgPack); - - List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); - for (TbMsg msg : secondMsgPack) { - node.onMsg(ctx, msg); - } - TbMsg msgWithLatestTsInSecondPack = getMsgWithLatestTs(secondMsgPack); - - awaitTellSelfLatch.await(); - - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(ctx, times(msgCount)).ack(any()); - verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); - verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - - List resultMsgs = newMsgCaptor.getAllValues(); - Assertions.assertEquals(2, resultMsgs.size()); - Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack)); - Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack)); - } - - private TbMsg getMsgWithLatestTs(List firstMsgPack) { - int indexOfLastMsgInArray = firstMsgPack.size() - 1; - int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; - TbMsg currentMaxTsMsg = firstMsgPack.get(indexOfLastMsgInArray); - TbMsg newLastMsgOfArray = firstMsgPack.get(indexToSetMaxTs); - firstMsgPack.set(indexOfLastMsgInArray, newLastMsgOfArray); - firstMsgPack.set(indexToSetMaxTs, currentMaxTsMsg); - return currentMaxTsMsg; - } - - private List getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) { - List inputMsgs = new ArrayList<>(); - var ts = currentTimeMillis + initTsStep; - for (int i = 0; i < msgCount; i++) { - inputMsgs.add(createMsg(deviceId, ts)); - ts += 2; - } - return inputMsgs; - } - - private TbMsg createMsg(DeviceId deviceId, long ts) { - ObjectNode dataNode = JacksonUtil.newObjectNode(); - dataNode.put("deviceId", deviceId.getId().toString()); - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("ts", String.valueOf(ts)); - return TbMsg.newMsg( - MAIN_QUEUE_NAME, - SessionMsgType.POST_TELEMETRY_REQUEST.name(), - deviceId, - metaData, - JacksonUtil.toString(dataNode)); - } - - private String getMergedData(List msgs) { - ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); - msgs.forEach(msg -> { - ObjectNode msgNode = JacksonUtil.newObjectNode(); - msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); - msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); - mergedData.add(msgNode); - }); - return JacksonUtil.toString(mergedData); - } - -} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 475a98e2a2..1badecfa92 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -174,7 +174,16 @@ public class TbMsgDeduplicationNodeTest { verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); + + TbMsg firstMsg = inputMsgs.get(0); + TbMsg actualMsg = newMsgCaptor.getValue(); + // msg ids should be different because we create new msg before enqueueForTellNext + Assertions.assertNotEquals(firstMsg.getId(), actualMsg.getId()); + Assertions.assertEquals(firstMsg.getOriginator(), actualMsg.getOriginator()); + Assertions.assertEquals(firstMsg.getCustomerId(), actualMsg.getCustomerId()); + Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); + Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); + Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); } @Test @@ -213,7 +222,15 @@ public class TbMsgDeduplicationNodeTest { verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue()); + + TbMsg actualMsg = newMsgCaptor.getValue(); + // msg ids should be different because we create new msg before enqueueForTellNext + Assertions.assertNotEquals(msgWithLatestTs.getId(), actualMsg.getId()); + Assertions.assertEquals(msgWithLatestTs.getOriginator(), actualMsg.getOriginator()); + Assertions.assertEquals(msgWithLatestTs.getCustomerId(), actualMsg.getCustomerId()); + Assertions.assertEquals(msgWithLatestTs.getData(), actualMsg.getData()); + Assertions.assertEquals(msgWithLatestTs.getMetaData(), actualMsg.getMetaData()); + Assertions.assertEquals(msgWithLatestTs.getType(), actualMsg.getType()); } @Test @@ -350,8 +367,24 @@ public class TbMsgDeduplicationNodeTest { List resultMsgs = newMsgCaptor.getAllValues(); Assertions.assertEquals(2, resultMsgs.size()); - Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack)); - Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack)); + + // verify that newMsg is called but content of messages is the same as in the last msg for the first pack. + TbMsg actualMsg = resultMsgs.get(0); + Assertions.assertNotEquals(msgWithLatestTsInFirstPack.getId(), actualMsg.getId()); + Assertions.assertEquals(msgWithLatestTsInFirstPack.getOriginator(), actualMsg.getOriginator()); + Assertions.assertEquals(msgWithLatestTsInFirstPack.getCustomerId(), actualMsg.getCustomerId()); + Assertions.assertEquals(msgWithLatestTsInFirstPack.getData(), actualMsg.getData()); + Assertions.assertEquals(msgWithLatestTsInFirstPack.getMetaData(), actualMsg.getMetaData()); + Assertions.assertEquals(msgWithLatestTsInFirstPack.getType(), actualMsg.getType()); + + // verify that newMsg is called but content of messages is the same as in the last msg for the second pack. + actualMsg = resultMsgs.get(1); + Assertions.assertNotEquals(msgWithLatestTsInSecondPack.getId(), actualMsg.getId()); + Assertions.assertEquals(msgWithLatestTsInSecondPack.getOriginator(), actualMsg.getOriginator()); + Assertions.assertEquals(msgWithLatestTsInSecondPack.getCustomerId(), actualMsg.getCustomerId()); + Assertions.assertEquals(msgWithLatestTsInSecondPack.getData(), actualMsg.getData()); + Assertions.assertEquals(msgWithLatestTsInSecondPack.getMetaData(), actualMsg.getMetaData()); + Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); } private TbMsg getMsgWithLatestTs(List firstMsgPack) {