added ackOrClose instead of chanel.close and refactoring

This commit is contained in:
YevhenBondarenko 2024-05-16 18:45:50 +02:00
parent 5c6928558b
commit b96a7fcbd2

View File

@ -32,7 +32,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import jakarta.annotation.Nullable;
import lombok.Getter;
import lombok.Setter;
@ -83,6 +85,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.ReturnCode.PAYLOAD_FORMAT_INVALID;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
@ -365,10 +368,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (!json.isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonArray()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
@ -402,7 +402,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
channel.close();
ackOrClose(msgId);
}
}
@ -445,7 +445,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
channel.close();
ackOrClose(msgId);
}
}
@ -462,11 +462,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (!json.isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonObject()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
@ -500,6 +497,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
ackOrClose(msgId);
}
}
@ -543,16 +541,14 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
ackOrClose(msgId);
}
}
private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (!json.isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonObject()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
@ -611,6 +607,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId);
}
}
@ -620,6 +617,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
} catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
ackOrClose(msgId);
}
}
@ -664,9 +662,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceRpcResponseJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
if (!json.isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
validateJsonObject(json);
JsonObject jsonObj = json.getAsJsonObject();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
Integer requestId = jsonObj.get("id").getAsInt();
@ -674,6 +670,12 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
onDeviceRpcResponse(requestId, data, deviceName, msgId);
}
private static void validateJsonObject(JsonElement json) {
if (!json.isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
private void onDeviceRpcResponseProto(int msgId, ByteBuf payload) throws AdaptorException {
try {
TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload));
@ -725,7 +727,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure(Throwable t) {
ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC);
ack(msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}, context.getExecutor());
@ -785,11 +787,23 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void ack(MqttPublishMessage msg, ReturnCode returnCode) {
int msgId = getMsgId(msg);
ack(msgId, returnCode);
}
protected void ack(int msgId, ReturnCode returnCode) {
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode));
}
}
private void ackOrClose(int msgId) {
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
ack(msgId, PAYLOAD_FORMAT_INVALID);
} else {
channel.close();
}
}
private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) {
if (this.deviceSessionCtx.isSparkplug()) {
sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(),