fixed pubAck publications
This commit is contained in:
parent
5e9ae421be
commit
00700afd83
@ -80,6 +80,8 @@ import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
@ -405,18 +407,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
|
||||
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
|
||||
validateJsonObject(json);
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
|
||||
if (!deviceEntry.getValue().isJsonArray()) {
|
||||
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
|
||||
.filter(entry -> {
|
||||
final boolean isArray = entry.getValue().isJsonArray();
|
||||
if (!isArray) {
|
||||
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
|
||||
}
|
||||
return isArray;
|
||||
})
|
||||
.toList();
|
||||
|
||||
if (deviceEntries.isEmpty()) {
|
||||
log.debug("[{}][{}][{}] Devices telemetry message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry message is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
|
||||
String deviceName = deviceEntry.getKey();
|
||||
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, TELEMETRY, t));
|
||||
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t));
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) {
|
||||
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
long systemTs = System.currentTimeMillis();
|
||||
TbPair<TransportProtos.PostTelemetryMsg, List<GatewayMetadata>> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs);
|
||||
@ -425,10 +443,10 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
if (!CollectionUtils.isEmpty(metadata)) {
|
||||
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metadata, systemTs);
|
||||
}
|
||||
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -441,23 +459,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(deviceMsgList.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
deviceMsgList.forEach(telemetryMsg -> {
|
||||
String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
|
||||
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, TELEMETRY, t));
|
||||
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t));
|
||||
});
|
||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) {
|
||||
protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId,
|
||||
AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
|
||||
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -475,26 +498,42 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException {
|
||||
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
|
||||
validateJsonObject(json);
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
|
||||
if (!deviceEntry.getValue().isJsonObject()) {
|
||||
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
|
||||
.filter(entry -> {
|
||||
boolean isJsonObject = entry.getValue().isJsonObject();
|
||||
if (!isJsonObject) {
|
||||
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
|
||||
}
|
||||
return isJsonObject;
|
||||
})
|
||||
.toList();
|
||||
|
||||
if (deviceEntries.isEmpty()) {
|
||||
log.debug("[{}][{}][{}] Devices claim message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices claim message is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
|
||||
String deviceName = deviceEntry.getKey();
|
||||
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, CLAIMING, t));
|
||||
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t));
|
||||
}
|
||||
}
|
||||
|
||||
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) {
|
||||
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId,
|
||||
AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
DeviceId deviceId = deviceCtx.getDeviceId();
|
||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest);
|
||||
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -507,49 +546,70 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(claimMsgList.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
claimMsgList.forEach(claimDeviceMsg -> {
|
||||
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
|
||||
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, CLAIMING, t));
|
||||
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t));
|
||||
});
|
||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) {
|
||||
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId,
|
||||
AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
DeviceId deviceId = deviceCtx.getDeviceId();
|
||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
|
||||
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException {
|
||||
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
|
||||
validateJsonObject(json);
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
|
||||
if (!deviceEntry.getValue().isJsonObject()) {
|
||||
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
|
||||
.filter(entry -> {
|
||||
boolean isJsonObject = entry.getValue().isJsonObject();
|
||||
if (!isJsonObject) {
|
||||
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
|
||||
}
|
||||
return isJsonObject;
|
||||
})
|
||||
.toList();
|
||||
|
||||
if (deviceEntries.isEmpty()) {
|
||||
log.debug("[{}][{}][{}] Devices attribute message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices attribute message is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
|
||||
String deviceName = deviceEntry.getKey();
|
||||
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, ATTRIBUTE, t));
|
||||
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t));
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) {
|
||||
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId,
|
||||
AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject());
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -562,23 +622,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(attributesMsgList.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
attributesMsgList.forEach(attributesMsg -> {
|
||||
String deviceName = checkDeviceName(attributesMsg.getDeviceName());
|
||||
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, ATTRIBUTE, t));
|
||||
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId,
|
||||
remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t));
|
||||
});
|
||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) {
|
||||
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId,
|
||||
AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
try {
|
||||
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent));
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -647,26 +712,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
}
|
||||
|
||||
private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) {
|
||||
process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId),
|
||||
t -> processFailure(msgId, deviceName, RPC_RESPONSE, t));
|
||||
AtomicInteger remaining = new AtomicInteger(1);
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId, remaining, ackSent),
|
||||
t -> processFailure(msgId, deviceName, RPC_RESPONSE, ackSent, t));
|
||||
}
|
||||
|
||||
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) {
|
||||
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName,
|
||||
int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
|
||||
.setRequestId(requestId).setPayload(data).build();
|
||||
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg));
|
||||
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg,
|
||||
getAggregatePubAckCallback(channel, msgId, deviceName, rpcResponseMsg, remaining, ackSent));
|
||||
}
|
||||
|
||||
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
|
||||
int msgId = getMsgId(mqttMsg);
|
||||
process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId),
|
||||
t -> {
|
||||
processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
|
||||
});
|
||||
AtomicInteger remaining = new AtomicInteger(1);
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
process(deviceName, deviceCtx -> {
|
||||
processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId, remaining, ackSent);
|
||||
},
|
||||
t -> processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, ackSent, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR, t));
|
||||
}
|
||||
|
||||
private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg));
|
||||
private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg,
|
||||
String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), requestMsg,
|
||||
getAggregatePubAckCallback(channel, msgId, deviceName, requestMsg, remaining, ackSent));
|
||||
}
|
||||
|
||||
private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) {
|
||||
@ -717,9 +790,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
}
|
||||
}
|
||||
|
||||
protected void ackOrClose(int msgId) {
|
||||
protected void ackOrClose(int msgId, AtomicBoolean ackSent) {
|
||||
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
|
||||
ack(msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);
|
||||
if (ackSent.compareAndSet(false, true)) {
|
||||
ack(msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);
|
||||
}
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
@ -741,36 +816,56 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
|
||||
keyValueProtoBuilder.setStringV(connectionState.name());
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
|
||||
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg));
|
||||
TransportServiceCallback<Void> pubAckCallback = getAggregatePubAckCallback(channel, -1, deviceName, postTelemetryMsg,
|
||||
new AtomicInteger(1), new AtomicBoolean(false));
|
||||
transportService.process(sessionInfo, postTelemetryMsg, pubAckCallback);
|
||||
}
|
||||
|
||||
public ConcurrentMap<String, T> getDevices () {
|
||||
public ConcurrentMap<String, T> getDevices() {
|
||||
return this.devices;
|
||||
}
|
||||
|
||||
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) {
|
||||
protected <T>TransportServiceCallback<Void> getAggregatePubAckCallback(
|
||||
final ChannelHandlerContext ctx,
|
||||
final int msgId,
|
||||
final String deviceName,
|
||||
final T msg,
|
||||
final AtomicInteger remaining,
|
||||
final AtomicBoolean ackSent) {
|
||||
|
||||
return new TransportServiceCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(Void dummy) {
|
||||
log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg);
|
||||
if (msgId > 0) {
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue()));
|
||||
} else {
|
||||
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg);
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
|
||||
if (remaining.decrementAndGet() == 0 && ackSent.compareAndSet(false, true)) {
|
||||
if (msgId > 0) {
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
|
||||
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue()));
|
||||
} else {
|
||||
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId);
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
|
||||
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e);
|
||||
if (e instanceof TbRateLimitsException) {
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
|
||||
} else {
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR);
|
||||
if (ackSent.compareAndSet(false, true)) {
|
||||
if (e instanceof TbRateLimitsException) {
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
|
||||
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue()));
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
|
||||
} else {
|
||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
|
||||
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
|
||||
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR);
|
||||
}
|
||||
ctx.close();
|
||||
}
|
||||
ctx.close();
|
||||
remaining.set(0);
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -789,18 +884,19 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
||||
}
|
||||
}
|
||||
|
||||
protected void processFailure(int msgId, String deviceName, String msgType, Throwable t) {
|
||||
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
|
||||
protected void processFailure(int msgId, String deviceName, String msgType, AtomicBoolean ackSent, Throwable t) {
|
||||
if (DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED.equals(t.getMessage())) {
|
||||
processFailure(msgId, deviceName, msgType, t, MqttReasonCodes.PubAck.QUOTA_EXCEEDED);
|
||||
processFailure(msgId, deviceName, msgType, ackSent, MqttReasonCodes.PubAck.QUOTA_EXCEEDED, t);
|
||||
} else {
|
||||
processFailure(msgId, deviceName, msgType, t, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR);
|
||||
processFailure(msgId, deviceName, msgType, ackSent, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR, t);
|
||||
}
|
||||
}
|
||||
|
||||
protected void processFailure(int msgId, String deviceName, String msgType, Throwable t, MqttReasonCodes.PubAck pubAck) {
|
||||
protected void processFailure(int msgId, String deviceName, String msgType, AtomicBoolean ackSent, MqttReasonCodes.PubAck pubAck, Throwable t) {
|
||||
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
|
||||
ack(msgId, pubAck);
|
||||
if (ackSent.compareAndSet(false, true)) {
|
||||
ack(msgId, pubAck);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) {
|
||||
|
||||
@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttReasonCodes;
|
||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -48,6 +49,8 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
|
||||
@ -144,37 +147,49 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
|
||||
|
||||
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
|
||||
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) {
|
||||
if (CollectionUtils.isEmpty(postTelemetryMsgList)) {
|
||||
log.debug("[{}] Device telemetry list is empty for: [{}]", sessionId, gateway.getDeviceId());
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(postTelemetryMsgList.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
process(contextListenableFuture, deviceCtx -> {
|
||||
for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) {
|
||||
try {
|
||||
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId);
|
||||
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId, remaining, ackSent);
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
|
||||
ackOrClose(msgId);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
},
|
||||
t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t));
|
||||
t -> processFailure(msgId, deviceName, "Failed to process device telemetry command", ackSent, t));
|
||||
}
|
||||
|
||||
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
|
||||
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
|
||||
try {
|
||||
if (CollectionUtils.isEmpty(attributesMsgList)) {
|
||||
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
|
||||
log.debug("[{}] Device attribute list is empty for: [{}]", sessionId, gateway.getDeviceId());
|
||||
}
|
||||
|
||||
AtomicInteger remaining = new AtomicInteger(attributesMsgList.size());
|
||||
AtomicBoolean ackSent = new AtomicBoolean(false);
|
||||
|
||||
process(contextListenableFuture, deviceCtx -> {
|
||||
for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) {
|
||||
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
|
||||
try {
|
||||
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
|
||||
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
|
||||
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId, remaining, ackSent);
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
|
||||
ackOrClose(msgId, ackSent);
|
||||
}
|
||||
}
|
||||
},
|
||||
t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t));
|
||||
t -> processFailure(msgId, deviceName, "Failed to process device attributes command", ackSent, t));
|
||||
} catch (RuntimeException e) {
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user