Move telling success into queue callback

This commit is contained in:
Dmytro Skarzhynets 2023-09-12 13:40:58 +03:00
parent 0c5e7a933c
commit 51aae476d3

View File

@ -59,18 +59,6 @@ import java.util.Map;
)
public class TbDeviceStateNode implements TbNode {
private static final TbQueueCallback EMPTY_CALLBACK = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final Map<TbMsgType, ConnectivityEvent> SUPPORTED_EVENTS = Map.of(
TbMsgType.CONNECT_EVENT, this::sendDeviceConnectMsg,
TbMsgType.ACTIVITY_EVENT, this::sendDeviceActivityMsg,
@ -96,7 +84,6 @@ public class TbDeviceStateNode implements TbNode {
throw new TbNodeException("Unsupported event: " + event, true);
}
this.event = event;
}
@Override
@ -126,7 +113,9 @@ public class TbDeviceStateNode implements TbNode {
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg)
);
}
private void sendDeviceActivityMsg(TbContext ctx, TbMsg msg) {
@ -142,7 +131,9 @@ public class TbDeviceStateNode implements TbNode {
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg)
);
}
private void sendDeviceDisconnectMsg(TbContext ctx, TbMsg msg) {
@ -157,7 +148,9 @@ public class TbDeviceStateNode implements TbNode {
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg)
);
}
private void sendDeviceInactivityMsg(TbContext ctx, TbMsg msg) {
@ -172,7 +165,23 @@ public class TbDeviceStateNode implements TbNode {
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(ctx.getTenantId(), msg.getOriginator(), toCoreMsg, EMPTY_CALLBACK);
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgProcessedCallback(ctx, msg)
);
}
private TbQueueCallback getMsgProcessedCallback(TbContext ctx, TbMsg msg) {
return new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
};
}
}