diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index b2338bd070..db83eec2e8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -105,6 +105,7 @@ import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -794,6 +795,12 @@ class DefaultTbContext implements TbContext { return metaData; } + + @Override + public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) { + mainCtx.getScheduler().schedule(runnable, delay, timeUnit); + } + @Override public void checkTenantEntity(EntityId entityId) { if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 332887d081..7133f91329 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -70,6 +70,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -202,6 +203,8 @@ public interface TbContext { * */ + void schedule(Runnable runnable, long delay, TimeUnit timeUnit); + void checkTenantEntity(EntityId entityId); boolean isLocalEntity(EntityId entityId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationId.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationId.java new file mode 100644 index 0000000000..0c3e7cbea2 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationId.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplication; + +public enum DeduplicationId { + + ORIGINATOR, TENANT, CUSTOMER + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationStrategy.java new file mode 100644 index 0000000000..d761a97b6d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/DeduplicationStrategy.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplication; + +public enum DeduplicationStrategy { + + FIRST, LAST, ALL + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java new file mode 100644 index 0000000000..de04ce1d3b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -0,0 +1,237 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplication; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@RuleNode( + type = ComponentType.ACTION, + name = "deduplication", + configClazz = TbMsgDeduplicationNodeConfiguration.class, + nodeDescription = "Deduplicate messages for a configurable period based on a specified deduplication strategy.", + nodeDetails = "Rule node allows you to select one of the following strategy to deduplicate messages:

" + + "FIRST - return first message that arrived during deduplication period.

" + + "LAST - return last message that arrived during deduplication period.

" + + "ALL - return all messages as a single JSON array message. Where each element represents object with msg and metadata inner properties.

", + icon = "content_copy", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbActionNodeMsgDeduplicationConfig" +) +@Slf4j +public class TbMsgDeduplicationNode implements TbNode { + + private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg"; + private static final int TB_MSG_DEDUPLICATION_TIMEOUT = 5000; + public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10; + + private TbMsgDeduplicationNodeConfiguration config; + + private final Map> deduplicationMap; + private long deduplicationInterval; + private long lastScheduledTs; + private DeduplicationId deduplicationId; + + public TbMsgDeduplicationNode() { + this.deduplicationMap = new HashMap<>(); + } + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); + this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); + this.deduplicationId = config.getId(); + scheduleTickMsg(ctx); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) { + try { + processDeduplication(ctx); + } finally { + scheduleTickMsg(ctx); + } + } else { + processOnRegularMsg(ctx, msg); + } + } + + @Override + public void destroy() { + deduplicationMap.clear(); + } + + private void processOnRegularMsg(TbContext ctx, TbMsg msg) { + EntityId id = getDeduplicationId(ctx, msg); + List deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new LinkedList<>()); + if (deduplicationMsgs.size() < config.getMaxPendingMsgs()) { + log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", ctx.getSelfId(), id, msg.getId(), msg.getMetaDataTs()); + deduplicationMsgs.add(msg); + ctx.ack(msg); + } else { + log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", ctx.getSelfId(), id); + ctx.tellFailure(msg, new RuntimeException("[" + ctx.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + id + "]")); + } + } + + private EntityId getDeduplicationId(TbContext ctx, TbMsg msg) { + switch (deduplicationId) { + case ORIGINATOR: + return msg.getOriginator(); + case TENANT: + return ctx.getTenantId(); + case CUSTOMER: + return msg.getCustomerId(); + default: + throw new IllegalStateException("Unsupported deduplication id: " + deduplicationId); + } + } + + private void processDeduplication(TbContext ctx) { + if (deduplicationMap.isEmpty()) { + return; + } + List deduplicationResults = new ArrayList<>(); + long deduplicationTimeoutMs = System.currentTimeMillis(); + deduplicationMap.forEach((entityId, tbMsgs) -> { + if (tbMsgs.isEmpty()) { + return; + } + Optional> packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs); + while (packBoundsOpt.isPresent()) { + TbPair packBounds = packBoundsOpt.get(); + if (DeduplicationStrategy.ALL.equals(config.getStrategy())) { + List pack = new ArrayList<>(); + for (Iterator iterator = tbMsgs.iterator(); iterator.hasNext(); ) { + TbMsg msg = iterator.next(); + long msgTs = msg.getMetaDataTs(); + if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) { + pack.add(msg); + iterator.remove(); + } + } + deduplicationResults.add(TbMsg.newMsg( + config.getQueueName(), + config.getOutMsgType(), + entityId, + getMetadata(), + getMergedData(pack))); + } else { + TbMsg resultMsg = null; + boolean searchMin = DeduplicationStrategy.FIRST.equals(config.getStrategy()); + for (Iterator iterator = tbMsgs.iterator(); iterator.hasNext(); ) { + TbMsg msg = iterator.next(); + long msgTs = msg.getMetaDataTs(); + if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) { + iterator.remove(); + if (resultMsg == null + || (searchMin && msg.getMetaDataTs() < resultMsg.getMetaDataTs()) + || (!searchMin && msg.getMetaDataTs() > resultMsg.getMetaDataTs())) { + resultMsg = msg; + } + } + } + deduplicationResults.add(resultMsg); + } + packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs); + } + }); + deduplicationResults.forEach(outMsg -> enqueueForTellNextWithRetry(ctx, outMsg, 0)); + } + + private Optional> findValidPack(List msgs, long deduplicationTimeoutMs) { + Optional min = msgs.stream().min(Comparator.comparing(TbMsg::getMetaDataTs)); + return min.map(minTsMsg -> { + long packStartTs = minTsMsg.getMetaDataTs(); + long packEndTs = packStartTs + deduplicationInterval; + if (packEndTs <= deduplicationTimeoutMs) { + return new TbPair<>(packStartTs, packEndTs); + } + return null; + }); + } + + private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) { + if (config.getMaxRetries() > retryAttempt) { + ctx.enqueueForTellNext(msg, TbRelationTypes.SUCCESS, + () -> { + log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt); + }, + throwable -> { + log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable); + ctx.schedule(() -> { + enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1); + }, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS); + }); + } + } + + private void scheduleTickMsg(TbContext ctx) { + long curTs = System.currentTimeMillis(); + if (lastScheduledTs == 0L) { + lastScheduledTs = curTs; + } + lastScheduledTs += TB_MSG_DEDUPLICATION_TIMEOUT; + long curDelay = Math.max(0L, (lastScheduledTs - curTs)); + TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); + ctx.tellSelf(tickMsg, curDelay); + } + + private String getMergedData(List msgs) { + ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); + msgs.forEach(msg -> { + ObjectNode msgNode = JacksonUtil.newObjectNode(); + msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); + msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); + mergedData.add(msgNode); + }); + return JacksonUtil.toString(mergedData); + } + + private TbMsgMetaData getMetadata() { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("ts", String.valueOf(System.currentTimeMillis())); + return metaData; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java new file mode 100644 index 0000000000..afb9cc62b6 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplication; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; + +@Data +public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration { + + private int interval; + private DeduplicationId id; + private DeduplicationStrategy strategy; + + // Advanced settings: + private int maxPendingMsgs; + private int maxRetries; + + // only for DeduplicationStrategy.ALL: + private String outMsgType; + private String queueName; + + @Override + public TbMsgDeduplicationNodeConfiguration defaultConfiguration() { + TbMsgDeduplicationNodeConfiguration configuration = new TbMsgDeduplicationNodeConfiguration(); + configuration.setInterval(60); + configuration.setId(DeduplicationId.ORIGINATOR); + configuration.setStrategy(DeduplicationStrategy.FIRST); + configuration.setMaxPendingMsgs(100); + configuration.setMaxRetries(3); + return configuration; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 5a313da469..5c2efb7e5b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -48,7 +48,6 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeMsgDelayConfig" ) - public class TbMsgDelayNode implements TbNode { private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg"; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java new file mode 100644 index 0000000000..923d5667f4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeduplicationNodeTest.java @@ -0,0 +1,402 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.stubbing.Answer; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy; +import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode; +import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.session.SessionMsgType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Slf4j +public class TbMsgDeduplicationNodeTest { + + private static final String MAIN_QUEUE_NAME = "Main"; + private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority"; + private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg"; + + private TbContext ctx; + + private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test"); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory); + private final int deduplicationInterval = 1; + + private TenantId tenantId; + + private TbMsgDeduplicationNode node; + private TbMsgDeduplicationNodeConfiguration config; + private TbNodeConfiguration nodeConfiguration; + + private CountDownLatch awaitTellSelfLatch; + + @BeforeEach + public void init() throws TbNodeException { + ctx = mock(TbContext.class); + + tenantId = TenantId.fromUUID(UUID.randomUUID()); + RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID()); + + when(ctx.getSelfId()).thenReturn(ruleNodeId); + when(ctx.getTenantId()).thenReturn(tenantId); + + doAnswer((Answer) invocationOnMock -> { + String type = (String) (invocationOnMock.getArguments())[1]; + EntityId originator = (EntityId) (invocationOnMock.getArguments())[2]; + TbMsgMetaData metaData = (TbMsgMetaData) (invocationOnMock.getArguments())[3]; + String data = (String) (invocationOnMock.getArguments())[4]; + return TbMsg.newMsg(type, originator, metaData.copy(), data); + }).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class)); + node = spy(new TbMsgDeduplicationNode()); + config = new TbMsgDeduplicationNodeConfiguration().defaultConfiguration(); + } + + private void invokeTellSelf(int maxNumberOfInvocation) { + invokeTellSelf(maxNumberOfInvocation, false, 0); + } + + private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) { + AtomicLong scheduleTimeout = new AtomicLong(deduplicationInterval); + AtomicInteger scheduleCount = new AtomicInteger(0); + doAnswer((Answer) invocationOnMock -> { + scheduleCount.getAndIncrement(); + if (scheduleCount.get() <= maxNumberOfInvocation) { + TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0]; + executorService.schedule(() -> { + try { + node.onMsg(ctx, msg); + awaitTellSelfLatch.countDown(); + } catch (ExecutionException | InterruptedException | TbNodeException e) { + log.error("Failed to execute tellSelf method call due to: ", e); + } + }, scheduleTimeout.get(), TimeUnit.SECONDS); + if (delayScheduleTimeout) { + scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier); + } + } + + return null; + }).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong()); + } + + @AfterEach + public void destroy() { + executorService.shutdown(); + node.destroy(); + } + + @Test + public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation); + + config.setInterval(deduplicationInterval); + config.setMaxPendingMsgs(msgCount); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); + for (TbMsg msg : inputMsgs) { + node.onMsg(ctx, msg); + } + + TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); + node.onMsg(ctx, msgToReject); + + awaitTellSelfLatch.await(); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); + verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); + } + + @Test + public void given_100_messages_strategy_last_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation); + + config.setStrategy(DeduplicationStrategy.LAST); + config.setInterval(deduplicationInterval); + config.setMaxPendingMsgs(msgCount); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); + TbMsg msgWithLatestTs = getMsgWithLatestTs(inputMsgs); + + for (TbMsg msg : inputMsgs) { + node.onMsg(ctx, msg); + } + + TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); + node.onMsg(ctx, msgToReject); + + awaitTellSelfLatch.await(); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); + verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue()); + } + + @Test + public void given_100_messages_strategy_all_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation); + + config.setInterval(deduplicationInterval); + config.setStrategy(DeduplicationStrategy.ALL); + config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); + config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); + for (TbMsg msg : inputMsgs) { + node.onMsg(ctx, msg); + } + + awaitTellSelfLatch.await(); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); + verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + + Assertions.assertEquals(1, newMsgCaptor.getAllValues().size()); + TbMsg outMessage = newMsgCaptor.getAllValues().get(0); + Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); + Assertions.assertEquals(deviceId, outMessage.getOriginator()); + Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); + Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); + } + + @Test + public void given_100_messages_strategy_all_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); + + config.setInterval(deduplicationInterval); + config.setStrategy(DeduplicationStrategy.ALL); + config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); + config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + List firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); + for (TbMsg msg : firstMsgPack) { + node.onMsg(ctx, msg); + } + long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); + + List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); + for (TbMsg msg : secondMsgPack) { + node.onMsg(ctx, msg); + } + + awaitTellSelfLatch.await(); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); + verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + + List resultMsgs = newMsgCaptor.getAllValues(); + Assertions.assertEquals(2, resultMsgs.size()); + + TbMsg firstMsg = resultMsgs.get(0); + Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData()); + Assertions.assertEquals(deviceId, firstMsg.getOriginator()); + Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType()); + Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName()); + + TbMsg secondMsg = resultMsgs.get(1); + Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData()); + Assertions.assertEquals(deviceId, secondMsg.getOriginator()); + Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType()); + Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName()); + } + + @Test + public void given_100_messages_strategy_last_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); + + config.setInterval(deduplicationInterval); + config.setStrategy(DeduplicationStrategy.LAST); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + List firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); + for (TbMsg msg : firstMsgPack) { + node.onMsg(ctx, msg); + } + long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); + TbMsg msgWithLatestTsInFirstPack = getMsgWithLatestTs(firstMsgPack); + + List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); + for (TbMsg msg : secondMsgPack) { + node.onMsg(ctx, msg); + } + TbMsg msgWithLatestTsInSecondPack = getMsgWithLatestTs(secondMsgPack); + + awaitTellSelfLatch.await(); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); + verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + + List resultMsgs = newMsgCaptor.getAllValues(); + Assertions.assertEquals(2, resultMsgs.size()); + Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack)); + Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack)); + } + + private TbMsg getMsgWithLatestTs(List firstMsgPack) { + int indexOfLastMsgInArray = firstMsgPack.size() - 1; + int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; + TbMsg currentMaxTsMsg = firstMsgPack.get(indexOfLastMsgInArray); + TbMsg newLastMsgOfArray = firstMsgPack.get(indexToSetMaxTs); + firstMsgPack.set(indexOfLastMsgInArray, newLastMsgOfArray); + firstMsgPack.set(indexToSetMaxTs, currentMaxTsMsg); + return currentMaxTsMsg; + } + + private List getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) { + List inputMsgs = new ArrayList<>(); + var ts = currentTimeMillis + initTsStep; + for (int i = 0; i < msgCount; i++) { + inputMsgs.add(createMsg(deviceId, ts)); + ts += 2; + } + return inputMsgs; + } + + private TbMsg createMsg(DeviceId deviceId, long ts) { + ObjectNode dataNode = JacksonUtil.newObjectNode(); + dataNode.put("deviceId", deviceId.getId().toString()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("ts", String.valueOf(ts)); + return TbMsg.newMsg( + MAIN_QUEUE_NAME, + SessionMsgType.POST_TELEMETRY_REQUEST.name(), + deviceId, + metaData, + JacksonUtil.toString(dataNode)); + } + + private String getMergedData(List msgs) { + ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); + msgs.forEach(msg -> { + ObjectNode msgNode = JacksonUtil.newObjectNode(); + msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); + msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); + mergedData.add(msgNode); + }); + return JacksonUtil.toString(mergedData); + } + +}