diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java index 7003f78a00..8adafe8d62 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java @@ -59,18 +59,6 @@ import java.util.Map; ) public class TbDeviceStateNode implements TbNode { - private static final TbQueueCallback EMPTY_CALLBACK = new TbQueueCallback() { - - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - } - - @Override - public void onFailure(Throwable t) { - } - - }; - private final Map SUPPORTED_EVENTS = Map.of( TbMsgType.CONNECT_EVENT, this::sendDeviceConnectMsg, TbMsgType.ACTIVITY_EVENT, this::sendDeviceActivityMsg, @@ -96,7 +84,6 @@ public class TbDeviceStateNode implements TbNode { throw new TbNodeException("Unsupported event: " + event, true); } this.event = event; - } @Override @@ -126,7 +113,9 @@ public class TbDeviceStateNode implements TbNode { var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() .setDeviceConnectMsg(deviceConnectMsg) .build(); - ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + ctx.getClusterService().pushMsgToCore( + ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg) + ); } private void sendDeviceActivityMsg(TbContext ctx, TbMsg msg) { @@ -142,7 +131,9 @@ public class TbDeviceStateNode implements TbNode { var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() .setDeviceActivityMsg(deviceActivityMsg) .build(); - ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + ctx.getClusterService().pushMsgToCore( + ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg) + ); } private void sendDeviceDisconnectMsg(TbContext ctx, TbMsg msg) { @@ -157,7 +148,9 @@ public class TbDeviceStateNode implements TbNode { var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() .setDeviceDisconnectMsg(deviceDisconnectMsg) .build(); - ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + ctx.getClusterService().pushMsgToCore( + ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg) + ); } private void sendDeviceInactivityMsg(TbContext ctx, TbMsg msg) { @@ -172,7 +165,23 @@ public class TbDeviceStateNode implements TbNode { var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() .setDeviceInactivityMsg(deviceInactivityMsg) .build(); - ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK); + ctx.getClusterService().pushMsgToCore( + ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg) + ); + } + + private TbQueueCallback getMsgProcessedCallback(TbContext ctx, TbMsg msg) { + return new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + ctx.tellSuccess(msg); + } + + @Override + public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); + } + }; } }