deduplication rule node: first stage without redis
This commit is contained in:
		
							parent
							
								
									5d08ef9289
								
							
						
					
					
						commit
						ea099ae70a
					
				@ -105,6 +105,7 @@ import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@ -794,6 +795,12 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
        return metaData;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
 | 
			
		||||
        mainCtx.getScheduler().schedule(runnable, delay, timeUnit);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void checkTenantEntity(EntityId entityId) {
 | 
			
		||||
        if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
 | 
			
		||||
 | 
			
		||||
@ -70,6 +70,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@ -202,6 +203,8 @@ public interface TbContext {
 | 
			
		||||
     *
 | 
			
		||||
     */
 | 
			
		||||
 | 
			
		||||
    void schedule(Runnable runnable, long delay, TimeUnit timeUnit);
 | 
			
		||||
 | 
			
		||||
    void checkTenantEntity(EntityId entityId);
 | 
			
		||||
 | 
			
		||||
    boolean isLocalEntity(EntityId entityId);
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,22 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.deduplication;
 | 
			
		||||
 | 
			
		||||
public enum DeduplicationId {
 | 
			
		||||
 | 
			
		||||
    ORIGINATOR, TENANT, CUSTOMER
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,22 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.deduplication;
 | 
			
		||||
 | 
			
		||||
public enum DeduplicationStrategy {
 | 
			
		||||
 | 
			
		||||
    FIRST, LAST, ALL
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,237 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.deduplication;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ArrayNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.data.util.Pair;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.util.TbPair;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Iterator;
 | 
			
		||||
import java.util.LinkedList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@RuleNode(
 | 
			
		||||
        type = ComponentType.ACTION,
 | 
			
		||||
        name = "deduplication",
 | 
			
		||||
        configClazz = TbMsgDeduplicationNodeConfiguration.class,
 | 
			
		||||
        nodeDescription = "Deduplicate messages for a configurable period based on a specified deduplication strategy.",
 | 
			
		||||
        nodeDetails = "Rule node allows you to select one of the following strategy to deduplicate messages: <br></br>" +
 | 
			
		||||
                "<b>FIRST</b> - return first message that arrived during deduplication period.<br></br>" +
 | 
			
		||||
                "<b>LAST</b> - return last message that arrived during deduplication period.<br></br>" +
 | 
			
		||||
                "<b>ALL</b> - return all messages as a single JSON array message. Where each element represents object with <b>msg</b> and <b>metadata</b> inner properties.<br></br>",
 | 
			
		||||
        icon = "content_copy",
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbActionNodeMsgDeduplicationConfig"
 | 
			
		||||
)
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbMsgDeduplicationNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg";
 | 
			
		||||
    private static final int TB_MSG_DEDUPLICATION_TIMEOUT = 5000;
 | 
			
		||||
    public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10;
 | 
			
		||||
 | 
			
		||||
    private TbMsgDeduplicationNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    private final Map<EntityId, List<TbMsg>> deduplicationMap;
 | 
			
		||||
    private long deduplicationInterval;
 | 
			
		||||
    private long lastScheduledTs;
 | 
			
		||||
    private DeduplicationId deduplicationId;
 | 
			
		||||
 | 
			
		||||
    public TbMsgDeduplicationNode() {
 | 
			
		||||
        this.deduplicationMap = new HashMap<>();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class);
 | 
			
		||||
        this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval());
 | 
			
		||||
        this.deduplicationId = config.getId();
 | 
			
		||||
        scheduleTickMsg(ctx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
 | 
			
		||||
        if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) {
 | 
			
		||||
            try {
 | 
			
		||||
                processDeduplication(ctx);
 | 
			
		||||
            } finally {
 | 
			
		||||
                scheduleTickMsg(ctx);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            processOnRegularMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        deduplicationMap.clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processOnRegularMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        EntityId id = getDeduplicationId(ctx, msg);
 | 
			
		||||
        List<TbMsg> deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new LinkedList<>());
 | 
			
		||||
        if (deduplicationMsgs.size() < config.getMaxPendingMsgs()) {
 | 
			
		||||
            log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", ctx.getSelfId(), id, msg.getId(), msg.getMetaDataTs());
 | 
			
		||||
            deduplicationMsgs.add(msg);
 | 
			
		||||
            ctx.ack(msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", ctx.getSelfId(), id);
 | 
			
		||||
            ctx.tellFailure(msg, new RuntimeException("[" + ctx.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + id + "]"));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EntityId getDeduplicationId(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        switch (deduplicationId) {
 | 
			
		||||
            case ORIGINATOR:
 | 
			
		||||
                return msg.getOriginator();
 | 
			
		||||
            case TENANT:
 | 
			
		||||
                return ctx.getTenantId();
 | 
			
		||||
            case CUSTOMER:
 | 
			
		||||
                return msg.getCustomerId();
 | 
			
		||||
            default:
 | 
			
		||||
                throw new IllegalStateException("Unsupported deduplication id: " + deduplicationId);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processDeduplication(TbContext ctx) {
 | 
			
		||||
        if (deduplicationMap.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        List<TbMsg> deduplicationResults = new ArrayList<>();
 | 
			
		||||
        long deduplicationTimeoutMs = System.currentTimeMillis();
 | 
			
		||||
        deduplicationMap.forEach((entityId, tbMsgs) -> {
 | 
			
		||||
            if (tbMsgs.isEmpty()) {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            Optional<TbPair<Long, Long>> packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs);
 | 
			
		||||
            while (packBoundsOpt.isPresent()) {
 | 
			
		||||
                TbPair<Long, Long> packBounds = packBoundsOpt.get();
 | 
			
		||||
                if (DeduplicationStrategy.ALL.equals(config.getStrategy())) {
 | 
			
		||||
                    List<TbMsg> pack = new ArrayList<>();
 | 
			
		||||
                    for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) {
 | 
			
		||||
                        TbMsg msg = iterator.next();
 | 
			
		||||
                        long msgTs = msg.getMetaDataTs();
 | 
			
		||||
                        if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) {
 | 
			
		||||
                            pack.add(msg);
 | 
			
		||||
                            iterator.remove();
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    deduplicationResults.add(TbMsg.newMsg(
 | 
			
		||||
                            config.getQueueName(),
 | 
			
		||||
                            config.getOutMsgType(),
 | 
			
		||||
                            entityId,
 | 
			
		||||
                            getMetadata(),
 | 
			
		||||
                            getMergedData(pack)));
 | 
			
		||||
                } else {
 | 
			
		||||
                    TbMsg resultMsg = null;
 | 
			
		||||
                    boolean searchMin = DeduplicationStrategy.FIRST.equals(config.getStrategy());
 | 
			
		||||
                    for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) {
 | 
			
		||||
                        TbMsg msg = iterator.next();
 | 
			
		||||
                        long msgTs = msg.getMetaDataTs();
 | 
			
		||||
                        if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) {
 | 
			
		||||
                            iterator.remove();
 | 
			
		||||
                            if (resultMsg == null
 | 
			
		||||
                                    || (searchMin && msg.getMetaDataTs() < resultMsg.getMetaDataTs())
 | 
			
		||||
                                    || (!searchMin && msg.getMetaDataTs() > resultMsg.getMetaDataTs())) {
 | 
			
		||||
                                resultMsg = msg;
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    deduplicationResults.add(resultMsg);
 | 
			
		||||
                }
 | 
			
		||||
                packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        deduplicationResults.forEach(outMsg -> enqueueForTellNextWithRetry(ctx, outMsg, 0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Optional<TbPair<Long, Long>> findValidPack(List<TbMsg> msgs, long deduplicationTimeoutMs) {
 | 
			
		||||
        Optional<TbMsg> min = msgs.stream().min(Comparator.comparing(TbMsg::getMetaDataTs));
 | 
			
		||||
        return min.map(minTsMsg -> {
 | 
			
		||||
            long packStartTs = minTsMsg.getMetaDataTs();
 | 
			
		||||
            long packEndTs = packStartTs + deduplicationInterval;
 | 
			
		||||
            if (packEndTs <= deduplicationTimeoutMs) {
 | 
			
		||||
                return new TbPair<>(packStartTs, packEndTs);
 | 
			
		||||
            }
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) {
 | 
			
		||||
        if (config.getMaxRetries() > retryAttempt) {
 | 
			
		||||
            ctx.enqueueForTellNext(msg, TbRelationTypes.SUCCESS,
 | 
			
		||||
                    () -> {
 | 
			
		||||
                        log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt);
 | 
			
		||||
                    },
 | 
			
		||||
                    throwable -> {
 | 
			
		||||
                        log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable);
 | 
			
		||||
                        ctx.schedule(() -> {
 | 
			
		||||
                            enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1);
 | 
			
		||||
                        }, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS);
 | 
			
		||||
                    });
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void scheduleTickMsg(TbContext ctx) {
 | 
			
		||||
        long curTs = System.currentTimeMillis();
 | 
			
		||||
        if (lastScheduledTs == 0L) {
 | 
			
		||||
            lastScheduledTs = curTs;
 | 
			
		||||
        }
 | 
			
		||||
        lastScheduledTs += TB_MSG_DEDUPLICATION_TIMEOUT;
 | 
			
		||||
        long curDelay = Math.max(0L, (lastScheduledTs - curTs));
 | 
			
		||||
        TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
 | 
			
		||||
        ctx.tellSelf(tickMsg, curDelay);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getMergedData(List<TbMsg> msgs) {
 | 
			
		||||
        ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
 | 
			
		||||
        msgs.forEach(msg -> {
 | 
			
		||||
            ObjectNode msgNode = JacksonUtil.newObjectNode();
 | 
			
		||||
            msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
 | 
			
		||||
            msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
 | 
			
		||||
            mergedData.add(msgNode);
 | 
			
		||||
        });
 | 
			
		||||
        return JacksonUtil.toString(mergedData);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsgMetaData getMetadata() {
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
 | 
			
		||||
        return metaData;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,46 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.deduplication;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration<TbMsgDeduplicationNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private int interval;
 | 
			
		||||
    private DeduplicationId id;
 | 
			
		||||
    private DeduplicationStrategy strategy;
 | 
			
		||||
 | 
			
		||||
    // Advanced settings:
 | 
			
		||||
    private int maxPendingMsgs;
 | 
			
		||||
    private int maxRetries;
 | 
			
		||||
 | 
			
		||||
    // only for DeduplicationStrategy.ALL:
 | 
			
		||||
    private String outMsgType;
 | 
			
		||||
    private String queueName;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgDeduplicationNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbMsgDeduplicationNodeConfiguration configuration = new TbMsgDeduplicationNodeConfiguration();
 | 
			
		||||
        configuration.setInterval(60);
 | 
			
		||||
        configuration.setId(DeduplicationId.ORIGINATOR);
 | 
			
		||||
        configuration.setStrategy(DeduplicationStrategy.FIRST);
 | 
			
		||||
        configuration.setMaxPendingMsgs(100);
 | 
			
		||||
        configuration.setMaxRetries(3);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -48,7 +48,6 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbActionNodeMsgDelayConfig"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
public class TbMsgDelayNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg";
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,402 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.action;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ArrayNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.junit.jupiter.api.AfterEach;
 | 
			
		||||
import org.junit.jupiter.api.Assertions;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.ArgumentMatchers;
 | 
			
		||||
import org.mockito.stubbing.Answer;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
 | 
			
		||||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode;
 | 
			
		||||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.CountDownLatch;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicLong;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.isNull;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.nullable;
 | 
			
		||||
import static org.mockito.Mockito.doAnswer;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
import static org.mockito.Mockito.times;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbMsgDeduplicationNodeTest {
 | 
			
		||||
 | 
			
		||||
    private static final String MAIN_QUEUE_NAME = "Main";
 | 
			
		||||
    private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority";
 | 
			
		||||
    private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg";
 | 
			
		||||
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
 | 
			
		||||
    private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test");
 | 
			
		||||
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory);
 | 
			
		||||
    private final int deduplicationInterval = 1;
 | 
			
		||||
 | 
			
		||||
    private TenantId tenantId;
 | 
			
		||||
 | 
			
		||||
    private TbMsgDeduplicationNode node;
 | 
			
		||||
    private TbMsgDeduplicationNodeConfiguration config;
 | 
			
		||||
    private TbNodeConfiguration nodeConfiguration;
 | 
			
		||||
 | 
			
		||||
    private CountDownLatch awaitTellSelfLatch;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void init() throws TbNodeException {
 | 
			
		||||
        ctx = mock(TbContext.class);
 | 
			
		||||
 | 
			
		||||
        tenantId = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
        RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
        when(ctx.getSelfId()).thenReturn(ruleNodeId);
 | 
			
		||||
        when(ctx.getTenantId()).thenReturn(tenantId);
 | 
			
		||||
 | 
			
		||||
        doAnswer((Answer<TbMsg>) invocationOnMock -> {
 | 
			
		||||
            String type = (String) (invocationOnMock.getArguments())[1];
 | 
			
		||||
            EntityId originator = (EntityId) (invocationOnMock.getArguments())[2];
 | 
			
		||||
            TbMsgMetaData metaData = (TbMsgMetaData) (invocationOnMock.getArguments())[3];
 | 
			
		||||
            String data = (String) (invocationOnMock.getArguments())[4];
 | 
			
		||||
            return TbMsg.newMsg(type, originator, metaData.copy(), data);
 | 
			
		||||
        }).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class));
 | 
			
		||||
        node = spy(new TbMsgDeduplicationNode());
 | 
			
		||||
        config = new TbMsgDeduplicationNodeConfiguration().defaultConfiguration();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void invokeTellSelf(int maxNumberOfInvocation) {
 | 
			
		||||
        invokeTellSelf(maxNumberOfInvocation, false, 0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) {
 | 
			
		||||
        AtomicLong scheduleTimeout = new AtomicLong(deduplicationInterval);
 | 
			
		||||
        AtomicInteger scheduleCount = new AtomicInteger(0);
 | 
			
		||||
        doAnswer((Answer<Void>) invocationOnMock -> {
 | 
			
		||||
            scheduleCount.getAndIncrement();
 | 
			
		||||
            if (scheduleCount.get() <= maxNumberOfInvocation) {
 | 
			
		||||
                TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0];
 | 
			
		||||
                executorService.schedule(() -> {
 | 
			
		||||
                    try {
 | 
			
		||||
                        node.onMsg(ctx, msg);
 | 
			
		||||
                        awaitTellSelfLatch.countDown();
 | 
			
		||||
                    } catch (ExecutionException | InterruptedException | TbNodeException e) {
 | 
			
		||||
                        log.error("Failed to execute tellSelf method call due to: ", e);
 | 
			
		||||
                    }
 | 
			
		||||
                }, scheduleTimeout.get(), TimeUnit.SECONDS);
 | 
			
		||||
                if (delayScheduleTimeout) {
 | 
			
		||||
                    scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @AfterEach
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        executorService.shutdown();
 | 
			
		||||
        node.destroy();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
 | 
			
		||||
        int wantedNumberOfTellSelfInvocation = 2;
 | 
			
		||||
        int msgCount = 100;
 | 
			
		||||
        awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
        invokeTellSelf(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
 | 
			
		||||
        config.setInterval(deduplicationInterval);
 | 
			
		||||
        config.setMaxPendingMsgs(msgCount);
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        long currentTimeMillis = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
 | 
			
		||||
        for (TbMsg msg : inputMsgs) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2);
 | 
			
		||||
        node.onMsg(ctx, msgToReject);
 | 
			
		||||
 | 
			
		||||
        awaitTellSelfLatch.await();
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
 | 
			
		||||
        ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
 | 
			
		||||
 | 
			
		||||
        verify(ctx, times(msgCount)).ack(any());
 | 
			
		||||
        verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
 | 
			
		||||
        verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
 | 
			
		||||
        verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
 | 
			
		||||
        Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void given_100_messages_strategy_last_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
 | 
			
		||||
        int wantedNumberOfTellSelfInvocation = 2;
 | 
			
		||||
        int msgCount = 100;
 | 
			
		||||
        awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
        invokeTellSelf(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
 | 
			
		||||
        config.setStrategy(DeduplicationStrategy.LAST);
 | 
			
		||||
        config.setInterval(deduplicationInterval);
 | 
			
		||||
        config.setMaxPendingMsgs(msgCount);
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        long currentTimeMillis = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
 | 
			
		||||
        TbMsg msgWithLatestTs = getMsgWithLatestTs(inputMsgs);
 | 
			
		||||
 | 
			
		||||
        for (TbMsg msg : inputMsgs) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2);
 | 
			
		||||
        node.onMsg(ctx, msgToReject);
 | 
			
		||||
 | 
			
		||||
        awaitTellSelfLatch.await();
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
 | 
			
		||||
        ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
 | 
			
		||||
 | 
			
		||||
        verify(ctx, times(msgCount)).ack(any());
 | 
			
		||||
        verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
 | 
			
		||||
        verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
 | 
			
		||||
        verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
 | 
			
		||||
        Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void given_100_messages_strategy_all_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
 | 
			
		||||
        int wantedNumberOfTellSelfInvocation = 2;
 | 
			
		||||
        int msgCount = 100;
 | 
			
		||||
        awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
        invokeTellSelf(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
 | 
			
		||||
        config.setInterval(deduplicationInterval);
 | 
			
		||||
        config.setStrategy(DeduplicationStrategy.ALL);
 | 
			
		||||
        config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
 | 
			
		||||
        config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        long currentTimeMillis = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
 | 
			
		||||
        for (TbMsg msg : inputMsgs) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        awaitTellSelfLatch.await();
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
 | 
			
		||||
        ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
 | 
			
		||||
 | 
			
		||||
        verify(ctx, times(msgCount)).ack(any());
 | 
			
		||||
        verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
 | 
			
		||||
        verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
 | 
			
		||||
 | 
			
		||||
        Assertions.assertEquals(1, newMsgCaptor.getAllValues().size());
 | 
			
		||||
        TbMsg outMessage = newMsgCaptor.getAllValues().get(0);
 | 
			
		||||
        Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData());
 | 
			
		||||
        Assertions.assertEquals(deviceId, outMessage.getOriginator());
 | 
			
		||||
        Assertions.assertEquals(config.getOutMsgType(), outMessage.getType());
 | 
			
		||||
        Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void given_100_messages_strategy_all_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException {
 | 
			
		||||
        int wantedNumberOfTellSelfInvocation = 2;
 | 
			
		||||
        int msgCount = 100;
 | 
			
		||||
        awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
        invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
 | 
			
		||||
 | 
			
		||||
        config.setInterval(deduplicationInterval);
 | 
			
		||||
        config.setStrategy(DeduplicationStrategy.ALL);
 | 
			
		||||
        config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
 | 
			
		||||
        config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        long currentTimeMillis = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500);
 | 
			
		||||
        for (TbMsg msg : firstMsgPack) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
        long firstPackDeduplicationPackEndTs =  firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
 | 
			
		||||
        for (TbMsg msg : secondMsgPack) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        awaitTellSelfLatch.await();
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
 | 
			
		||||
        ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
 | 
			
		||||
 | 
			
		||||
        verify(ctx, times(msgCount)).ack(any());
 | 
			
		||||
        verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
 | 
			
		||||
        verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> resultMsgs = newMsgCaptor.getAllValues();
 | 
			
		||||
        Assertions.assertEquals(2, resultMsgs.size());
 | 
			
		||||
 | 
			
		||||
        TbMsg firstMsg = resultMsgs.get(0);
 | 
			
		||||
        Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData());
 | 
			
		||||
        Assertions.assertEquals(deviceId, firstMsg.getOriginator());
 | 
			
		||||
        Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType());
 | 
			
		||||
        Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName());
 | 
			
		||||
 | 
			
		||||
        TbMsg secondMsg = resultMsgs.get(1);
 | 
			
		||||
        Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData());
 | 
			
		||||
        Assertions.assertEquals(deviceId, secondMsg.getOriginator());
 | 
			
		||||
        Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType());
 | 
			
		||||
        Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void given_100_messages_strategy_last_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException {
 | 
			
		||||
        int wantedNumberOfTellSelfInvocation = 2;
 | 
			
		||||
        int msgCount = 100;
 | 
			
		||||
        awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
 | 
			
		||||
        invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
 | 
			
		||||
 | 
			
		||||
        config.setInterval(deduplicationInterval);
 | 
			
		||||
        config.setStrategy(DeduplicationStrategy.LAST);
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        long currentTimeMillis = System.currentTimeMillis();
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500);
 | 
			
		||||
        for (TbMsg msg : firstMsgPack) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
        long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
 | 
			
		||||
        TbMsg msgWithLatestTsInFirstPack = getMsgWithLatestTs(firstMsgPack);
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
 | 
			
		||||
        for (TbMsg msg : secondMsgPack) {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        }
 | 
			
		||||
        TbMsg msgWithLatestTsInSecondPack = getMsgWithLatestTs(secondMsgPack);
 | 
			
		||||
 | 
			
		||||
        awaitTellSelfLatch.await();
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
 | 
			
		||||
        ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
 | 
			
		||||
 | 
			
		||||
        verify(ctx, times(msgCount)).ack(any());
 | 
			
		||||
        verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
 | 
			
		||||
        verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
 | 
			
		||||
 | 
			
		||||
        List<TbMsg> resultMsgs = newMsgCaptor.getAllValues();
 | 
			
		||||
        Assertions.assertEquals(2, resultMsgs.size());
 | 
			
		||||
        Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack));
 | 
			
		||||
        Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) {
 | 
			
		||||
        int indexOfLastMsgInArray = firstMsgPack.size() - 1;
 | 
			
		||||
        int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
 | 
			
		||||
        TbMsg currentMaxTsMsg = firstMsgPack.get(indexOfLastMsgInArray);
 | 
			
		||||
        TbMsg newLastMsgOfArray = firstMsgPack.get(indexToSetMaxTs);
 | 
			
		||||
        firstMsgPack.set(indexOfLastMsgInArray, newLastMsgOfArray);
 | 
			
		||||
        firstMsgPack.set(indexToSetMaxTs, currentMaxTsMsg);
 | 
			
		||||
        return currentMaxTsMsg;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<TbMsg> getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) {
 | 
			
		||||
        List<TbMsg> inputMsgs = new ArrayList<>();
 | 
			
		||||
        var ts = currentTimeMillis + initTsStep;
 | 
			
		||||
        for (int i = 0; i < msgCount; i++) {
 | 
			
		||||
            inputMsgs.add(createMsg(deviceId, ts));
 | 
			
		||||
            ts += 2;
 | 
			
		||||
        }
 | 
			
		||||
        return inputMsgs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg createMsg(DeviceId deviceId, long ts) {
 | 
			
		||||
        ObjectNode dataNode = JacksonUtil.newObjectNode();
 | 
			
		||||
        dataNode.put("deviceId", deviceId.getId().toString());
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("ts", String.valueOf(ts));
 | 
			
		||||
        return TbMsg.newMsg(
 | 
			
		||||
                MAIN_QUEUE_NAME,
 | 
			
		||||
                SessionMsgType.POST_TELEMETRY_REQUEST.name(),
 | 
			
		||||
                deviceId,
 | 
			
		||||
                metaData,
 | 
			
		||||
                JacksonUtil.toString(dataNode));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getMergedData(List<TbMsg> msgs) {
 | 
			
		||||
        ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
 | 
			
		||||
        msgs.forEach(msg -> {
 | 
			
		||||
            ObjectNode msgNode = JacksonUtil.newObjectNode();
 | 
			
		||||
            msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
 | 
			
		||||
            msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
 | 
			
		||||
            mergedData.add(msgNode);
 | 
			
		||||
        });
 | 
			
		||||
        return JacksonUtil.toString(mergedData);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user