Improve handling rate limit and send response of failure to edge

This commit is contained in:
Andrii Landiak 2024-02-20 14:37:06 +02:00
parent 64f4b586a9
commit 7d634c2ee0

View File

@ -27,7 +27,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.data.util.Pair;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
@ -112,6 +110,7 @@ public final class EdgeGrpcSession implements Closeable {
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
private static final String RATE_LIMIT_REACHED = "Rate limit reached";
private final UUID sessionId;
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
@ -267,42 +266,51 @@ public final class EdgeGrpcSession implements Closeable {
}
private void onUplinkMsg(UplinkMsg uplinkMsg) {
if (!ctx.getRateLimitService().checkRateLimit(LimitedApi.EDGE_UPLINK_MESSAGES, tenantId)) {
throw new TbRateLimitsException(EntityType.TENANT);
}
if (!ctx.getRateLimitService().checkRateLimit(LimitedApi.EDGE_UPLINK_MESSAGES_PER_EDGE, tenantId, edge.getId())) {
throw new TbRateLimitsException(EntityType.EDGE);
if (isRateLimitViolated(uplinkMsg)) {
return;
}
ListenableFuture<List<Void>> future = processUplinkMsg(uplinkMsg);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder()
.setUplinkMsgId(uplinkMsg.getUplinkMsgId())
.setSuccess(true).build();
sendDownlinkMsg(ResponseMsg.newBuilder()
.setUplinkResponseMsg(uplinkResponseMsg)
.build());
sendResponseMessage(uplinkMsg.getUplinkMsgId(), true, null);
}
@Override
public void onFailure(Throwable t) {
String errorMsg = EdgeUtils.createErrorMsgFromRootCauseAndStackTrace(t);
UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder()
.setUplinkMsgId(uplinkMsg.getUplinkMsgId())
.setSuccess(false).setErrorMsg(errorMsg).build();
sendDownlinkMsg(ResponseMsg.newBuilder()
.setUplinkResponseMsg(uplinkResponseMsg)
.build());
sendResponseMessage(uplinkMsg.getUplinkMsgId(), false, errorMsg);
}
}, ctx.getGrpcCallbackExecutorService());
}
private boolean isRateLimitViolated(UplinkMsg uplinkMsg) {
if (!ctx.getRateLimitService().checkRateLimit(LimitedApi.EDGE_UPLINK_MESSAGES, tenantId) ||
!ctx.getRateLimitService().checkRateLimit(LimitedApi.EDGE_UPLINK_MESSAGES_PER_EDGE, tenantId, edge.getId())) {
String errorMsg = String.format("Failed to process uplink message. %s", RATE_LIMIT_REACHED);
sendResponseMessage(uplinkMsg.getUplinkMsgId(), false, errorMsg);
return true;
}
return false;
}
private void sendResponseMessage(int uplinkMsgId, boolean success, String errorMsg) {
UplinkResponseMsg.Builder responseBuilder = UplinkResponseMsg.newBuilder()
.setUplinkMsgId(uplinkMsgId)
.setSuccess(success);
if (errorMsg != null) {
responseBuilder.setErrorMsg(errorMsg);
}
sendDownlinkMsg(ResponseMsg.newBuilder()
.setUplinkResponseMsg(responseBuilder.build())
.build());
}
private void onDownlinkResponse(DownlinkResponseMsg msg) {
try {
if (msg.getSuccess()) {
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
log.debug("[{}][{}] Msg has been processed successfully!Msg Id: [{}], Msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
log.debug("[{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
} else {
log.error("[{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg());
}