diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 91e7dedbf2..af8ed2aa6d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -412,7 +412,7 @@ public abstract class AbstractGatewaySessionHandler processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, TELEMETRY, t)); + t -> processFailure(msgId, deviceName, TELEMETRY, t)); } } @@ -444,7 +444,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), - t -> failedToProcessLog(deviceName, TELEMETRY, t)); + t -> processFailure(msgId, deviceName, TELEMETRY, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -483,7 +483,7 @@ public abstract class AbstractGatewaySessionHandler processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, CLAIMING, t)); + t -> processFailure(msgId, deviceName, CLAIMING, t)); } } @@ -510,7 +510,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), - t -> failedToProcessLog(deviceName, CLAIMING, t)); + t -> processFailure(msgId, deviceName, CLAIMING, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -539,7 +539,7 @@ public abstract class AbstractGatewaySessionHandler processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); + t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); } } @@ -565,7 +565,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), - t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); + t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -648,7 +648,7 @@ public abstract class AbstractGatewaySessionHandler processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), - t -> failedToProcessLog(deviceName, RPC_RESPONSE, t)); + t -> processFailure(msgId, deviceName, RPC_RESPONSE, t)); } private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { @@ -661,8 +661,7 @@ public abstract class AbstractGatewaySessionHandler processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), t -> { - failedToProcessLog(deviceName, ATTRIBUTES_REQUEST, t); - ack(mqttMsg, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); }); } @@ -790,10 +789,19 @@ public abstract class AbstractGatewaySessionHandler