Merge pull request #11913 from YevhenBondarenko/hotfix/input-node-latency
used enqueue for input node to avoid latency issue
This commit is contained in:
commit
2ef69c5491
@ -109,6 +109,7 @@ import org.thingsboard.server.dao.user.UserService;
|
|||||||
import org.thingsboard.server.dao.widget.WidgetTypeService;
|
import org.thingsboard.server.dao.widget.WidgetTypeService;
|
||||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
import org.thingsboard.server.queue.TbQueueCallback;
|
||||||
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
|
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
|
||||||
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
|
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
|
||||||
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
|
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
|
||||||
@ -172,8 +173,13 @@ class DefaultTbContext implements TbContext {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void input(TbMsg msg, RuleChainId ruleChainId) {
|
public void input(TbMsg msg, RuleChainId ruleChainId) {
|
||||||
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
if (!msg.isValid()) {
|
||||||
nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg));
|
return;
|
||||||
|
}
|
||||||
|
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId);
|
||||||
|
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||||
|
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||||
|
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -209,14 +215,10 @@ class DefaultTbContext implements TbContext {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
|
||||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
|
||||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
|
||||||
if (nodeCtx.getSelf().isDebugMode()) {
|
if (nodeCtx.getSelf().isDebugMode()) {
|
||||||
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
|
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
|
||||||
}
|
}
|
||||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
|
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(
|
||||||
metadata -> {
|
metadata -> {
|
||||||
if (onSuccess != null) {
|
if (onSuccess != null) {
|
||||||
onSuccess.run();
|
onSuccess.run();
|
||||||
@ -231,6 +233,14 @@ class DefaultTbContext implements TbContext {
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) {
|
||||||
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
|
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||||
|
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||||
|
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
||||||
|
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
|
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
|
||||||
TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
||||||
|
|||||||
@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
|
|||||||
RuleChain finalRuleChain = rootRuleChain;
|
RuleChain finalRuleChain = rootRuleChain;
|
||||||
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
|
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
|
||||||
|
|
||||||
|
Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS)
|
||||||
|
.until(() ->
|
||||||
|
getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000)
|
||||||
|
.getData()
|
||||||
|
.stream()
|
||||||
|
.filter(filterByPostTelemetryEventType())
|
||||||
|
.count() == 2
|
||||||
|
);
|
||||||
|
|
||||||
eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
|
eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
|
||||||
events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList());
|
events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList());
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user