diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index af8ed2aa6d..168fef6c2e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -80,6 +80,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -405,18 +407,34 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonArray()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + final boolean isArray = entry.getValue().isJsonArray(); + if (!isArray) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isArray; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices telemetry message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, TELEMETRY, t)); + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t)); } } - private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) { + private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { try { long systemTs = System.currentTimeMillis(); TbPair> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs); @@ -425,10 +443,10 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); - process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), - t -> processFailure(msgId, deviceName, TELEMETRY, t)); + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) { + protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); - transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -475,26 +498,42 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonObject()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + boolean isJsonObject = entry.getValue().isJsonObject(); + if (!isJsonObject) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isJsonObject; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices claim message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices claim message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, CLAIMING, t)); + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t)); } } - private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) { + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { DeviceId deviceId = deviceCtx.getDeviceId(); TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest); - transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -507,49 +546,70 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); - process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), - t -> processFailure(msgId, deviceName, CLAIMING, t)); + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) { + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { DeviceId deviceId = deviceCtx.getDeviceId(); TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray()); - transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException { JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); validateJsonObject(json); - for (Map.Entry deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonObject()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + boolean isJsonObject = entry.getValue().isJsonObject(); + if (!isJsonObject) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isJsonObject; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices attribute message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices attribute message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t)); } } - private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { + private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -562,23 +622,28 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); - process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), - t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -647,26 +712,34 @@ public abstract class AbstractGatewaySessionHandler processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), - t -> processFailure(msgId, deviceName, RPC_RESPONSE, t)); + AtomicInteger remaining = new AtomicInteger(1); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId, remaining, ackSent), + t -> processFailure(msgId, deviceName, RPC_RESPONSE, ackSent, t)); } - private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { + private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, + int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() .setRequestId(requestId).setPayload(data).build(); - transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); + transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, + getAggregatePubAckCallback(channel, msgId, deviceName, rpcResponseMsg, remaining, ackSent)); } private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { int msgId = getMsgId(mqttMsg); - process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), - t -> { - processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); - }); + AtomicInteger remaining = new AtomicInteger(1); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(deviceName, deviceCtx -> { + processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId, remaining, ackSent); + }, + t -> processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, ackSent, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR, t)); } - private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { - transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); + private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, + String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { + transportService.process(deviceCtx.getSessionInfo(), requestMsg, + getAggregatePubAckCallback(channel, msgId, deviceName, requestMsg, remaining, ackSent)); } private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set keys) { @@ -717,9 +790,11 @@ public abstract class AbstractGatewaySessionHandler pubAckCallback = getAggregatePubAckCallback(channel, -1, deviceName, postTelemetryMsg, + new AtomicInteger(1), new AtomicBoolean(false)); + transportService.process(sessionInfo, postTelemetryMsg, pubAckCallback); } - public ConcurrentMap getDevices () { + public ConcurrentMap getDevices() { return this.devices; } - private TransportServiceCallback getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) { + protected TransportServiceCallback getAggregatePubAckCallback( + final ChannelHandlerContext ctx, + final int msgId, + final String deviceName, + final T msg, + final AtomicInteger remaining, + final AtomicBoolean ackSent) { + return new TransportServiceCallback() { @Override public void onSuccess(Void dummy) { log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg); - if (msgId > 0) { - ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); - } else { - log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg); - ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); + if (remaining.decrementAndGet() == 0 && ackSent.compareAndSet(false, true)) { + if (msgId > 0) { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); + } else { + log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId); + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); + } } } @Override public void onError(Throwable e) { log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e); - if (e instanceof TbRateLimitsException) { - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); - } else { - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); + if (ackSent.compareAndSet(false, true)) { + if (e instanceof TbRateLimitsException) { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); + } else { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); + } + ctx.close(); } - ctx.close(); + remaining.set(0); } }; } @@ -789,18 +884,19 @@ public abstract class AbstractGatewaySessionHandler contextListenableFuture, int msgId, List postTelemetryMsgList, String deviceName) { + if (CollectionUtils.isEmpty(postTelemetryMsgList)) { + log.debug("[{}] Device telemetry list is empty for: [{}]", sessionId, gateway.getDeviceId()); + } + + AtomicInteger remaining = new AtomicInteger(postTelemetryMsgList.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(contextListenableFuture, deviceCtx -> { for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { try { - processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId, remaining, ackSent); } catch (Throwable e) { log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } }, - t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t)); + t -> processFailure(msgId, deviceName, "Failed to process device telemetry command", ackSent, t)); } private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, List attributesMsgList, String deviceName) throws AdaptorException { try { if (CollectionUtils.isEmpty(attributesMsgList)) { - log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); + log.debug("[{}] Device attribute list is empty for: [{}]", sessionId, gateway.getDeviceId()); } + + AtomicInteger remaining = new AtomicInteger(attributesMsgList.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(contextListenableFuture, deviceCtx -> { for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId, remaining, ackSent); } catch (Throwable e) { log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); + ackOrClose(msgId, ackSent); } } }, - t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t)); + t -> processFailure(msgId, deviceName, "Failed to process device attributes command", ackSent, t)); } catch (RuntimeException e) { throw new AdaptorException(e); }