Merge pull request #14070 from dashevchenko/mqttMissedAckFix

[Gateway] Fixed PUBACK not sent if publishing failed or duplicated if message contains multiple devices
This commit is contained in:
Viacheslav Klimov 2025-10-09 14:47:26 +03:00 committed by GitHub
commit f0052ed010
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 272 additions and 76 deletions

View File

@ -25,10 +25,10 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
@ -43,6 +43,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
@ -112,6 +113,42 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2); processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
} }
@Test
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
String devicePayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]";
String payloadA = "{\"Device A\": " + devicePayload + "}";
String deviceBPayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]";
String payloadB = "{\"Device B\": " + deviceBPayload + "}";
testAckIsReceivedOnFailedPublishMessage("Device A", payloadA.getBytes(), "Device B", payloadB.getBytes());
}
protected void testAckIsReceivedOnFailedPublishMessage(String deviceName1, byte[] payload1, String deviceName2, byte[] payload2) throws Exception {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setMaxDevices(3);
});
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload1);
// check device is created
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
assertNotNull(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class));
});
client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload2);
client.disconnectAndWait();
// check device was not created due to limit
doGet("/api/tenant/devices?deviceName=" + deviceName2).andExpect(status().isNotFound());
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setMaxDevices(0);
});
}
@Test @Test
public void testGatewayConnect() throws Exception { public void testGatewayConnect() throws Exception {
String payload = "{\"device\":\"Device A\"}"; String payload = "{\"device\":\"Device A\"}";

View File

@ -186,4 +186,15 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
assertFalse(callback.isPubAckReceived()); assertFalse(callback.isPubAckReceived());
} }
@Override
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Telemetry device json payload")
.gatewayName("Test Post Telemetry gateway json payload")
.transportPayloadType(TransportPayloadType.JSON)
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
super.testAckIsReceivedOnFailedPublishMessage();
}
} }

View File

@ -459,6 +459,31 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
assertFalse(callback.isPubAckReceived()); assertFalse(callback.isPubAckReceived());
} }
@Override
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Telemetry device proto payload")
.gatewayName("Test Post Telemetry gateway proto payload")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.TelemetryMsg deviceATelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName1, expectedKeys, 10000, 20000);
gatewayTelemetryMsgProtoBuilder.addAllMsg(List.of(deviceATelemetryMsgProto));
TransportApiProtos.GatewayTelemetryMsg payload1 = gatewayTelemetryMsgProtoBuilder.build();
TransportApiProtos.TelemetryMsg deviceBTelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName2, expectedKeys, 10000, 20000);
TransportApiProtos.GatewayTelemetryMsg payload2 = TransportApiProtos.GatewayTelemetryMsg.newBuilder()
.addAllMsg(List.of(deviceBTelemetryMsgProto))
.build();
super.testAckIsReceivedOnFailedPublishMessage(deviceName1, payload1.toByteArray(), deviceName2, payload2.toByteArray());
}
private DynamicSchema getDynamicSchema() { private DynamicSchema getDynamicSchema() {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);

View File

@ -80,6 +80,8 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -405,18 +407,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException { protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
validateJsonObject(json); validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonArray()) { List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); .filter(entry -> {
continue; final boolean isArray = entry.getValue().isJsonArray();
} if (!isArray) {
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
}
return isArray;
})
.toList();
if (deviceEntries.isEmpty()) {
log.debug("[{}][{}][{}] Devices telemetry message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry message is empty for [" + gateway.getDeviceId() + "]");
}
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
t -> failedToProcessLog(deviceName, TELEMETRY, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t));
} }
} }
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) { private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
long systemTs = System.currentTimeMillis(); long systemTs = System.currentTimeMillis();
TbPair<TransportProtos.PostTelemetryMsg, List<GatewayMetadata>> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs); TbPair<TransportProtos.PostTelemetryMsg, List<GatewayMetadata>> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs);
@ -425,10 +443,10 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
if (!CollectionUtils.isEmpty(metadata)) { if (!CollectionUtils.isEmpty(metadata)) {
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metadata, systemTs); gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metadata, systemTs);
} }
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
@ -441,23 +459,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]"); throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
} }
AtomicInteger remaining = new AtomicInteger(deviceMsgList.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
deviceMsgList.forEach(telemetryMsg -> { deviceMsgList.forEach(telemetryMsg -> {
String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId,
t -> failedToProcessLog(deviceName, TELEMETRY, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t));
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) { protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId,
AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
@ -475,26 +498,42 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceClaimJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
validateJsonObject(json); validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonObject()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
}
List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
.filter(entry -> {
boolean isJsonObject = entry.getValue().isJsonObject();
if (!isJsonObject) {
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
}
return isJsonObject;
})
.toList();
if (deviceEntries.isEmpty()) {
log.debug("[{}][{}][{}] Devices claim message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices claim message is empty for [" + gateway.getDeviceId() + "]");
}
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
t -> failedToProcessLog(deviceName, CLAIMING, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t));
} }
} }
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) { private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId,
AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
DeviceId deviceId = deviceCtx.getDeviceId(); DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest); TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest);
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
@ -507,49 +546,70 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]"); throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
} }
AtomicInteger remaining = new AtomicInteger(claimMsgList.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
claimMsgList.forEach(claimDeviceMsg -> { claimMsgList.forEach(claimDeviceMsg -> {
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId,
t -> failedToProcessLog(deviceName, CLAIMING, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t));
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) { private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId,
AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
DeviceId deviceId = deviceCtx.getDeviceId(); DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray()); TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException { private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
validateJsonObject(json); validateJsonObject(json);
for (Map.Entry<String, JsonElement> deviceEntry : json.getAsJsonObject().entrySet()) {
if (!deviceEntry.getValue().isJsonObject()) {
log.warn("{}[{}]", CAN_T_PARSE_VALUE, json);
continue;
}
List<Map.Entry<String, JsonElement>> deviceEntries = json.getAsJsonObject().entrySet().stream()
.filter(entry -> {
boolean isJsonObject = entry.getValue().isJsonObject();
if (!isJsonObject) {
log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue());
}
return isJsonObject;
})
.toList();
if (deviceEntries.isEmpty()) {
log.debug("[{}][{}][{}] Devices attribute message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new IllegalArgumentException("[" + sessionId + "] Devices attribute message is empty for [" + gateway.getDeviceId() + "]");
}
AtomicInteger remaining = new AtomicInteger(deviceEntries.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
for (Map.Entry<String, JsonElement> deviceEntry : deviceEntries) {
String deviceName = deviceEntry.getKey(); String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId,
t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t));
} }
} }
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId,
AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject());
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
@ -562,23 +622,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]"); throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
} }
AtomicInteger remaining = new AtomicInteger(attributesMsgList.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
attributesMsgList.forEach(attributesMsg -> { attributesMsgList.forEach(attributesMsg -> {
String deviceName = checkDeviceName(attributesMsg.getDeviceName()); String deviceName = checkDeviceName(attributesMsg.getDeviceName());
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId,
t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); remaining, ackSent),
t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t));
}); });
} catch (RuntimeException | InvalidProtocolBufferException e) { } catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }
} }
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId,
AtomicInteger remaining, AtomicBoolean ackSent) {
try { try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e); log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
@ -647,27 +712,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) { private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) {
process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), AtomicInteger remaining = new AtomicInteger(1);
t -> failedToProcessLog(deviceName, RPC_RESPONSE, t)); AtomicBoolean ackSent = new AtomicBoolean(false);
process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId, remaining, ackSent),
t -> processFailure(msgId, deviceName, RPC_RESPONSE, ackSent, t));
} }
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName,
int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(data).build(); .setRequestId(requestId).setPayload(data).build();
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg,
getAggregatePubAckCallback(channel, msgId, deviceName, rpcResponseMsg, remaining, ackSent));
} }
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
int msgId = getMsgId(mqttMsg); int msgId = getMsgId(mqttMsg);
process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), AtomicInteger remaining = new AtomicInteger(1);
t -> { AtomicBoolean ackSent = new AtomicBoolean(false);
failedToProcessLog(deviceName, ATTRIBUTES_REQUEST, t); process(deviceName, deviceCtx -> {
ack(mqttMsg, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId, remaining, ackSent);
}); },
t -> processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, ackSent, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR, t));
} }
private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg,
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) {
transportService.process(deviceCtx.getSessionInfo(), requestMsg,
getAggregatePubAckCallback(channel, msgId, deviceName, requestMsg, remaining, ackSent));
} }
private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) { private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) {
@ -718,9 +790,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
protected void ackOrClose(int msgId) { protected void ackOrClose(int msgId, AtomicBoolean ackSent) {
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
ack(msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID); if (ackSent.compareAndSet(false, true)) {
ack(msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);
}
} else { } else {
channel.close(); channel.close();
} }
@ -742,23 +816,38 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V); keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
keyValueProtoBuilder.setStringV(connectionState.name()); keyValueProtoBuilder.setStringV(connectionState.name());
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts); TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg)); TransportServiceCallback<Void> pubAckCallback = getAggregatePubAckCallback(channel, -1, deviceName, postTelemetryMsg,
new AtomicInteger(1), new AtomicBoolean(false));
transportService.process(sessionInfo, postTelemetryMsg, pubAckCallback);
} }
public ConcurrentMap<String, T> getDevices () { public ConcurrentMap<String, T> getDevices() {
return this.devices; return this.devices;
} }
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) { protected <T>TransportServiceCallback<Void> getAggregatePubAckCallback(
final ChannelHandlerContext ctx,
final int msgId,
final String deviceName,
final T msg,
final AtomicInteger remaining,
final AtomicBoolean ackSent) {
return new TransportServiceCallback<Void>() { return new TransportServiceCallback<Void>() {
@Override @Override
public void onSuccess(Void dummy) { public void onSuccess(Void dummy) {
log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg); log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg);
if (msgId > 0) { if (remaining.decrementAndGet() == 0 && ackSent.compareAndSet(false, true)) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); if (msgId > 0) {
} else { ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg); deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue()));
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); } else {
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId);
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
}
}
if (msgId <= 0) {
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
} }
} }
@ -767,11 +856,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e); log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e);
if (e instanceof TbRateLimitsException) { if (e instanceof TbRateLimitsException) {
if (ackSent.compareAndSet(false, true)) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue()));
ctx.close();
}
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
} else { } else {
if (ackSent.compareAndSet(false, true)) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
ctx.close();
}
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR);
} }
ctx.close();
} }
}; };
} }
@ -790,10 +888,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
protected void failedToProcessLog(String deviceName, String msgType, Throwable t) { protected void processFailure(int msgId, String deviceName, String msgType, AtomicBoolean ackSent, Throwable t) {
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t); if (DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED.equals(t.getMessage())) {
processFailure(msgId, deviceName, msgType, ackSent, MqttReasonCodes.PubAck.QUOTA_EXCEEDED, t);
} else {
processFailure(msgId, deviceName, msgType, ackSent, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR, t);
}
} }
protected void processFailure(int msgId, String deviceName, String msgType, AtomicBoolean ackSent, MqttReasonCodes.PubAck pubAck, Throwable t) {
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
if (ackSent.compareAndSet(false, true)) {
ack(msgId, pubAck);
}
}
private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) { private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) {
try { try {

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -48,6 +49,8 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
@ -144,37 +147,49 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) { int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) {
if (CollectionUtils.isEmpty(postTelemetryMsgList)) {
log.debug("[{}] Device telemetry list is empty for: [{}]", sessionId, gateway.getDeviceId());
}
AtomicInteger remaining = new AtomicInteger(postTelemetryMsgList.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
process(contextListenableFuture, deviceCtx -> { process(contextListenableFuture, deviceCtx -> {
for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) {
try { try {
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId, remaining, ackSent);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
ackOrClose(msgId); ackOrClose(msgId, ackSent);
} }
} }
}, },
t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t)); t -> processFailure(msgId, deviceName, "Failed to process device telemetry command", ackSent, t));
} }
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId, private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException { List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
try { try {
if (CollectionUtils.isEmpty(attributesMsgList)) { if (CollectionUtils.isEmpty(attributesMsgList)) {
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); log.debug("[{}] Device attribute list is empty for: [{}]", sessionId, gateway.getDeviceId());
} }
AtomicInteger remaining = new AtomicInteger(attributesMsgList.size());
AtomicBoolean ackSent = new AtomicBoolean(false);
process(contextListenableFuture, deviceCtx -> { process(contextListenableFuture, deviceCtx -> {
for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try { try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId, remaining, ackSent);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
ackOrClose(msgId, ackSent);
} }
} }
}, },
t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t)); t -> processFailure(msgId, deviceName, "Failed to process device attributes command", ackSent, t));
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw new AdaptorException(e); throw new AdaptorException(e);
} }