From f9b52c3a43e33caf494972fb19f4749f36c8082e Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 29 Sep 2025 11:44:27 +0300 Subject: [PATCH 1/7] fixed missing ACK when message publishing fails --- .../AbstractGatewaySessionHandler.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 91e7dedbf2..af8ed2aa6d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -412,7 +412,7 @@ public abstract class AbstractGatewaySessionHandler processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, TELEMETRY, t)); + t -> processFailure(msgId, deviceName, TELEMETRY, t)); } } @@ -444,7 +444,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), - t -> failedToProcessLog(deviceName, TELEMETRY, t)); + t -> processFailure(msgId, deviceName, TELEMETRY, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -483,7 +483,7 @@ public abstract class AbstractGatewaySessionHandler processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, CLAIMING, t)); + t -> processFailure(msgId, deviceName, CLAIMING, t)); } } @@ -510,7 +510,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), - t -> failedToProcessLog(deviceName, CLAIMING, t)); + t -> processFailure(msgId, deviceName, CLAIMING, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -539,7 +539,7 @@ public abstract class AbstractGatewaySessionHandler processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); + t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); } } @@ -565,7 +565,7 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), - t -> failedToProcessLog(deviceName, ATTRIBUTE, t)); + t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); @@ -648,7 +648,7 @@ public abstract class AbstractGatewaySessionHandler processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), - t -> failedToProcessLog(deviceName, RPC_RESPONSE, t)); + t -> processFailure(msgId, deviceName, RPC_RESPONSE, t)); } private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { @@ -661,8 +661,7 @@ public abstract class AbstractGatewaySessionHandler processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), t -> { - failedToProcessLog(deviceName, ATTRIBUTES_REQUEST, t); - ack(mqttMsg, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); }); } @@ -790,10 +789,19 @@ public abstract class AbstractGatewaySessionHandler Date: Mon, 29 Sep 2025 16:16:15 +0300 Subject: [PATCH 2/7] added tests --- ...AbstractMqttTimeseriesIntegrationTest.java | 39 ++++++++++++++++++- ...ractMqttTimeseriesJsonIntegrationTest.java | 11 ++++++ ...actMqttTimeseriesProtoIntegrationTest.java | 25 ++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 6543955ed5..9856aa18cb 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -25,10 +25,10 @@ import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; -import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; @@ -43,6 +43,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.any; @@ -112,6 +113,42 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2); } + @Test + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + String devicePayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]"; + String payloadA = "{\"Device A\": " + devicePayload + "}"; + + String deviceBPayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]"; + String payloadB = "{\"Device B\": " + deviceBPayload + "}"; + + testAckIsReceivedOnFailedPublishMessage("Device A", payloadA.getBytes(), "Device B", payloadB.getBytes()); + } + + protected void testAckIsReceivedOnFailedPublishMessage(String deviceName1, byte[] payload1, String deviceName2, byte[] payload2) throws Exception { + updateDefaultTenantProfileConfig(profileConfiguration -> { + profileConfiguration.setMaxDevices(3); + }); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload1); + + // check device is created + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertNotNull(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class)); + }); + + client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload2); + client.disconnectAndWait(); + + // check device was not created due to limit + doGet("/api/tenant/devices?deviceName=" + deviceName2).andExpect(status().isNotFound()); + + updateDefaultTenantProfileConfig(profileConfiguration -> { + profileConfiguration.setMaxDevices(0); + }); + } + @Test public void testGatewayConnect() throws Exception { String payload = "{\"device\":\"Device A\"}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java index a23506a160..92b35d896c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java @@ -186,4 +186,15 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract assertFalse(callback.isPubAckReceived()); } + @Override + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Post Telemetry device json payload") + .gatewayName("Test Post Telemetry gateway json payload") + .transportPayloadType(TransportPayloadType.JSON) + .telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC) + .build(); + processBeforeTest(configProperties); + super.testAckIsReceivedOnFailedPublishMessage(); + } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java index 1b6c247d96..60e5857c04 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java @@ -459,6 +459,31 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac assertFalse(callback.isPubAckReceived()); } + @Override + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Post Telemetry device proto payload") + .gatewayName("Test Post Telemetry gateway proto payload") + .transportPayloadType(TransportPayloadType.PROTOBUF) + .telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC) + .build(); + processBeforeTest(configProperties); + + TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder(); + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + String deviceName1 = "Device A"; + String deviceName2 = "Device B"; + TransportApiProtos.TelemetryMsg deviceATelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName1, expectedKeys, 10000, 20000); + gatewayTelemetryMsgProtoBuilder.addAllMsg(List.of(deviceATelemetryMsgProto)); + TransportApiProtos.GatewayTelemetryMsg payload1 = gatewayTelemetryMsgProtoBuilder.build(); + + TransportApiProtos.TelemetryMsg deviceBTelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName2, expectedKeys, 10000, 20000); + TransportApiProtos.GatewayTelemetryMsg payload2 = TransportApiProtos.GatewayTelemetryMsg.newBuilder() + .addAllMsg(List.of(deviceBTelemetryMsgProto)) + .build(); + super.testAckIsReceivedOnFailedPublishMessage(deviceName1, payload1.toByteArray(), deviceName2, payload2.toByteArray()); + } + private DynamicSchema getDynamicSchema() { DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration); From 00700afd83637c752f3fe340bebb723dc5c87845 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Thu, 2 Oct 2025 15:29:38 +0300 Subject: [PATCH 3/7] fixed pubAck publications --- .../AbstractGatewaySessionHandler.java | 250 ++++++++++++------ .../session/SparkplugNodeSessionHandler.java | 27 +- 2 files changed, 194 insertions(+), 83 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index af8ed2aa6d..168fef6c2e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -80,6 +80,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -405,18 +407,34 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonArray()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + final boolean isArray = entry.getValue().isJsonArray(); + if (!isArray) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isArray; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices telemetry message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, TELEMETRY, t)); + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t)); } } - private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) { + private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { try { long systemTs = System.currentTimeMillis(); TbPair> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs); @@ -425,10 +443,10 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(telemetryMsg.getDeviceName()); - process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId), - t -> processFailure(msgId, deviceName, TELEMETRY, t)); + process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, TELEMETRY, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId) { + protected void processPostTelemetryMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostTelemetryMsg msg, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray()); - transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postTelemetryMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -475,26 +498,42 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonObject()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + boolean isJsonObject = entry.getValue().isJsonObject(); + if (!isJsonObject) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isJsonObject; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices claim message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices claim message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, CLAIMING, t)); + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t)); } } - private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId) { + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement claimRequest, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { DeviceId deviceId = deviceCtx.getDeviceId(); TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, claimRequest); - transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -507,49 +546,70 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName()); - process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId), - t -> processFailure(msgId, deviceName, CLAIMING, t)); + process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, CLAIMING, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId) { + private void processClaimDeviceMsg(MqttDeviceAwareSessionContext deviceCtx, TransportApiProtos.ClaimDevice claimRequest, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { DeviceId deviceId = deviceCtx.getDeviceId(); TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray()); - transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getAggregatePubAckCallback(channel, msgId, deviceName, claimDeviceMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } private void onDeviceAttributesJson(int msgId, ByteBuf payload) throws AdaptorException { JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, payload); validateJsonObject(json); - for (Map.Entry deviceEntry : json.getAsJsonObject().entrySet()) { - if (!deviceEntry.getValue().isJsonObject()) { - log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); - continue; - } + List> deviceEntries = json.getAsJsonObject().entrySet().stream() + .filter(entry -> { + boolean isJsonObject = entry.getValue().isJsonObject(); + if (!isJsonObject) { + log.warn("{} device='{}' value={}", CAN_T_PARSE_VALUE, entry.getKey(), entry.getValue()); + } + return isJsonObject; + }) + .toList(); + + if (deviceEntries.isEmpty()) { + log.debug("[{}][{}][{}] Devices attribute message is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId); + throw new IllegalArgumentException("[" + sessionId + "] Devices attribute message is empty for [" + gateway.getDeviceId() + "]"); + } + + AtomicInteger remaining = new AtomicInteger(deviceEntries.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + + for (Map.Entry deviceEntry : deviceEntries) { String deviceName = deviceEntry.getKey(); - process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId), - t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t)); } } - private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId) { + private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, JsonElement msg, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(msg.getAsJsonObject()); - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -562,23 +622,28 @@ public abstract class AbstractGatewaySessionHandler { String deviceName = checkDeviceName(attributesMsg.getDeviceName()); - process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId), - t -> processFailure(msgId, deviceName, ATTRIBUTE, t)); + process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId, + remaining, ackSent), + t -> processFailure(msgId, deviceName, ATTRIBUTE, ackSent, t)); }); } catch (RuntimeException | InvalidProtocolBufferException e) { throw new AdaptorException(e); } } - protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId) { + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg kvListProto, String deviceName, int msgId, + AtomicInteger remaining, AtomicBoolean ackSent) { try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getAggregatePubAckCallback(channel, msgId, deviceName, postAttributeMsg, remaining, ackSent)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } @@ -647,26 +712,34 @@ public abstract class AbstractGatewaySessionHandler processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId), - t -> processFailure(msgId, deviceName, RPC_RESPONSE, t)); + AtomicInteger remaining = new AtomicInteger(1); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId, remaining, ackSent), + t -> processFailure(msgId, deviceName, RPC_RESPONSE, ackSent, t)); } - private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) { + private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, + int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() .setRequestId(requestId).setPayload(data).build(); - transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg)); + transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, + getAggregatePubAckCallback(channel, msgId, deviceName, rpcResponseMsg, remaining, ackSent)); } private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) { int msgId = getMsgId(mqttMsg); - process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId), - t -> { - processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); - }); + AtomicInteger remaining = new AtomicInteger(1); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(deviceName, deviceCtx -> { + processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId, remaining, ackSent); + }, + t -> processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, ackSent, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR, t)); } - private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, String deviceName, int msgId) { - transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg)); + private void processGetAttributeRequestMessage(T deviceCtx, TransportProtos.GetAttributeRequestMsg requestMsg, + String deviceName, int msgId, AtomicInteger remaining, AtomicBoolean ackSent) { + transportService.process(deviceCtx.getSessionInfo(), requestMsg, + getAggregatePubAckCallback(channel, msgId, deviceName, requestMsg, remaining, ackSent)); } private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set keys) { @@ -717,9 +790,11 @@ public abstract class AbstractGatewaySessionHandler pubAckCallback = getAggregatePubAckCallback(channel, -1, deviceName, postTelemetryMsg, + new AtomicInteger(1), new AtomicBoolean(false)); + transportService.process(sessionInfo, postTelemetryMsg, pubAckCallback); } - public ConcurrentMap getDevices () { + public ConcurrentMap getDevices() { return this.devices; } - private TransportServiceCallback getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) { + protected TransportServiceCallback getAggregatePubAckCallback( + final ChannelHandlerContext ctx, + final int msgId, + final String deviceName, + final T msg, + final AtomicInteger remaining, + final AtomicBoolean ackSent) { + return new TransportServiceCallback() { @Override public void onSuccess(Void dummy) { log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg); - if (msgId > 0) { - ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); - } else { - log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg); - ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); + if (remaining.decrementAndGet() == 0 && ackSent.compareAndSet(false, true)) { + if (msgId > 0) { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); + } else { + log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId); + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); + } } } @Override public void onError(Throwable e) { log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e); - if (e instanceof TbRateLimitsException) { - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); - } else { - closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); + if (ackSent.compareAndSet(false, true)) { + if (e instanceof TbRateLimitsException) { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); + } else { + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( + deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); + } + ctx.close(); } - ctx.close(); + remaining.set(0); } }; } @@ -789,18 +884,19 @@ public abstract class AbstractGatewaySessionHandler contextListenableFuture, int msgId, List postTelemetryMsgList, String deviceName) { + if (CollectionUtils.isEmpty(postTelemetryMsgList)) { + log.debug("[{}] Device telemetry list is empty for: [{}]", sessionId, gateway.getDeviceId()); + } + + AtomicInteger remaining = new AtomicInteger(postTelemetryMsgList.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(contextListenableFuture, deviceCtx -> { for (TransportProtos.PostTelemetryMsg telemetryMsg : postTelemetryMsgList) { try { - processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId); + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, msgId, remaining, ackSent); } catch (Throwable e) { log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); - ackOrClose(msgId); + ackOrClose(msgId, ackSent); } } }, - t -> log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t)); + t -> processFailure(msgId, deviceName, "Failed to process device telemetry command", ackSent, t)); } private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, List attributesMsgList, String deviceName) throws AdaptorException { try { if (CollectionUtils.isEmpty(attributesMsgList)) { - log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); + log.debug("[{}] Device attribute list is empty for: [{}]", sessionId, gateway.getDeviceId()); } + + AtomicInteger remaining = new AtomicInteger(attributesMsgList.size()); + AtomicBoolean ackSent = new AtomicBoolean(false); + process(contextListenableFuture, deviceCtx -> { for (TransportApiProtos.AttributesMsg attributesMsg : attributesMsgList) { TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); try { TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); - processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId, remaining, ackSent); } catch (Throwable e) { log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); + ackOrClose(msgId, ackSent); } } }, - t -> log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t)); + t -> processFailure(msgId, deviceName, "Failed to process device attributes command", ackSent, t)); } catch (RuntimeException e) { throw new AdaptorException(e); } From 6cfa3f1b0b6b4a699f1df94f4528322b8199d107 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Tue, 7 Oct 2025 12:22:57 +0300 Subject: [PATCH 4/7] fixed device session closing: should close every device session if message id wrong --- .../AbstractGatewaySessionHandler.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 168fef6c2e..fee2f04e24 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -845,27 +845,31 @@ public abstract class AbstractGatewaySessionHandler Date: Thu, 9 Oct 2025 14:50:03 +0300 Subject: [PATCH 5/7] Version set to 4.2.1-RC --- application/pom.xml | 2 +- common/actor/pom.xml | 2 +- common/cache/pom.xml | 2 +- common/cluster-api/pom.xml | 2 +- common/coap-server/pom.xml | 2 +- common/dao-api/pom.xml | 2 +- common/data/pom.xml | 2 +- common/discovery-api/pom.xml | 2 +- common/edge-api/pom.xml | 2 +- common/edqs/pom.xml | 2 +- common/message/pom.xml | 2 +- common/pom.xml | 2 +- common/proto/pom.xml | 2 +- common/queue/pom.xml | 2 +- common/script/pom.xml | 2 +- common/script/remote-js-client/pom.xml | 2 +- common/script/script-api/pom.xml | 2 +- common/stats/pom.xml | 2 +- common/transport/coap/pom.xml | 2 +- common/transport/http/pom.xml | 2 +- common/transport/lwm2m/pom.xml | 2 +- common/transport/mqtt/pom.xml | 2 +- common/transport/pom.xml | 2 +- common/transport/snmp/pom.xml | 2 +- common/transport/transport-api/pom.xml | 2 +- common/util/pom.xml | 2 +- common/version-control/pom.xml | 2 +- dao/pom.xml | 2 +- edqs/pom.xml | 2 +- monitoring/pom.xml | 2 +- msa/black-box-tests/pom.xml | 2 +- msa/edqs/pom.xml | 2 +- msa/js-executor/package.json | 2 +- msa/js-executor/pom.xml | 2 +- msa/monitoring/pom.xml | 2 +- msa/pom.xml | 2 +- msa/tb-node/pom.xml | 2 +- msa/tb/pom.xml | 2 +- msa/transport/coap/pom.xml | 2 +- msa/transport/http/pom.xml | 2 +- msa/transport/lwm2m/pom.xml | 2 +- msa/transport/mqtt/pom.xml | 2 +- msa/transport/pom.xml | 2 +- msa/transport/snmp/pom.xml | 2 +- msa/vc-executor-docker/pom.xml | 2 +- msa/vc-executor/pom.xml | 2 +- msa/web-ui/package.json | 2 +- msa/web-ui/pom.xml | 2 +- netty-mqtt/pom.xml | 4 ++-- pom.xml | 2 +- rest-client/pom.xml | 2 +- rule-engine/pom.xml | 2 +- rule-engine/rule-engine-api/pom.xml | 2 +- rule-engine/rule-engine-components/pom.xml | 2 +- tools/pom.xml | 2 +- transport/coap/pom.xml | 2 +- transport/http/pom.xml | 2 +- transport/lwm2m/pom.xml | 2 +- transport/mqtt/pom.xml | 2 +- transport/pom.xml | 2 +- transport/snmp/pom.xml | 2 +- ui-ngx/package.json | 2 +- ui-ngx/pom.xml | 2 +- 63 files changed, 64 insertions(+), 64 deletions(-) diff --git a/application/pom.xml b/application/pom.xml index 0413f7732c..b415ee08a6 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard application diff --git a/common/actor/pom.xml b/common/actor/pom.xml index a6efdda77d..4149fe2ad9 100644 --- a/common/actor/pom.xml +++ b/common/actor/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/cache/pom.xml b/common/cache/pom.xml index 3088dad098..6831156fde 100644 --- a/common/cache/pom.xml +++ b/common/cache/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/cluster-api/pom.xml b/common/cluster-api/pom.xml index 939dfb1e16..78ef7980e8 100644 --- a/common/cluster-api/pom.xml +++ b/common/cluster-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/coap-server/pom.xml b/common/coap-server/pom.xml index fbfbe78b55..b24fd98f5d 100644 --- a/common/coap-server/pom.xml +++ b/common/coap-server/pom.xml @@ -22,7 +22,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/dao-api/pom.xml b/common/dao-api/pom.xml index 636c7b9ef4..555b4a54bb 100644 --- a/common/dao-api/pom.xml +++ b/common/dao-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/data/pom.xml b/common/data/pom.xml index da2a0970b1..f27df3687f 100644 --- a/common/data/pom.xml +++ b/common/data/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/discovery-api/pom.xml b/common/discovery-api/pom.xml index 4f9ef70c5b..cb5238a664 100644 --- a/common/discovery-api/pom.xml +++ b/common/discovery-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/edge-api/pom.xml b/common/edge-api/pom.xml index 69a01be812..2b7db0cbd3 100644 --- a/common/edge-api/pom.xml +++ b/common/edge-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/edqs/pom.xml b/common/edqs/pom.xml index 140eabd270..cc5c5db14e 100644 --- a/common/edqs/pom.xml +++ b/common/edqs/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/message/pom.xml b/common/message/pom.xml index 834a041127..c84a4fde2d 100644 --- a/common/message/pom.xml +++ b/common/message/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/pom.xml b/common/pom.xml index 563f1ec3ab..f45a2bdd28 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard common diff --git a/common/proto/pom.xml b/common/proto/pom.xml index 6357cae711..30d4b0bc68 100644 --- a/common/proto/pom.xml +++ b/common/proto/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/queue/pom.xml b/common/queue/pom.xml index ccc0eadf1a..69adac81b4 100644 --- a/common/queue/pom.xml +++ b/common/queue/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/script/pom.xml b/common/script/pom.xml index c203bfdbf2..ab5ac032c7 100644 --- a/common/script/pom.xml +++ b/common/script/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/script/remote-js-client/pom.xml b/common/script/remote-js-client/pom.xml index c2f5ee2da2..1a3e00a88c 100644 --- a/common/script/remote-js-client/pom.xml +++ b/common/script/remote-js-client/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC script org.thingsboard.common.script diff --git a/common/script/script-api/pom.xml b/common/script/script-api/pom.xml index e35dec1b1b..316f823a44 100644 --- a/common/script/script-api/pom.xml +++ b/common/script/script-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC script org.thingsboard.common.script diff --git a/common/stats/pom.xml b/common/stats/pom.xml index 090732d175..4bf24411dc 100644 --- a/common/stats/pom.xml +++ b/common/stats/pom.xml @@ -22,7 +22,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/transport/coap/pom.xml b/common/transport/coap/pom.xml index aaa9713e02..be3c62c057 100644 --- a/common/transport/coap/pom.xml +++ b/common/transport/coap/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.common.transport diff --git a/common/transport/http/pom.xml b/common/transport/http/pom.xml index 29d633d83e..a136079cb5 100644 --- a/common/transport/http/pom.xml +++ b/common/transport/http/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.common.transport diff --git a/common/transport/lwm2m/pom.xml b/common/transport/lwm2m/pom.xml index 4d7013a2bc..904e359bcc 100644 --- a/common/transport/lwm2m/pom.xml +++ b/common/transport/lwm2m/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.common.transport diff --git a/common/transport/mqtt/pom.xml b/common/transport/mqtt/pom.xml index 2baee9bda2..45f9b76af7 100644 --- a/common/transport/mqtt/pom.xml +++ b/common/transport/mqtt/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.common.transport diff --git a/common/transport/pom.xml b/common/transport/pom.xml index 2289b53aed..f786bcdd0e 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/transport/snmp/pom.xml b/common/transport/snmp/pom.xml index 71bd0c17fe..d3b2c46d69 100644 --- a/common/transport/snmp/pom.xml +++ b/common/transport/snmp/pom.xml @@ -21,7 +21,7 @@ org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml index 29ace644a9..a3fee460e1 100644 --- a/common/transport/transport-api/pom.xml +++ b/common/transport/transport-api/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.common - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.common.transport diff --git a/common/util/pom.xml b/common/util/pom.xml index 3f07b2c05b..ebedf5dea3 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/common/version-control/pom.xml b/common/version-control/pom.xml index 19278b8d13..b6a0ea1d2e 100644 --- a/common/version-control/pom.xml +++ b/common/version-control/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC common org.thingsboard.common diff --git a/dao/pom.xml b/dao/pom.xml index a32d0c4f6d..a1843959df 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard dao diff --git a/edqs/pom.xml b/edqs/pom.xml index c29710490b..d648d9d786 100644 --- a/edqs/pom.xml +++ b/edqs/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard edqs diff --git a/monitoring/pom.xml b/monitoring/pom.xml index cee8fc0a6b..77874c5ac9 100644 --- a/monitoring/pom.xml +++ b/monitoring/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard diff --git a/msa/black-box-tests/pom.xml b/msa/black-box-tests/pom.xml index 84509ba48f..fed33f0002 100644 --- a/msa/black-box-tests/pom.xml +++ b/msa/black-box-tests/pom.xml @@ -21,7 +21,7 @@ org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/edqs/pom.xml b/msa/edqs/pom.xml index d1f8291e2e..c5be28e7bb 100644 --- a/msa/edqs/pom.xml +++ b/msa/edqs/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 6e5f590260..220561f6e7 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -1,7 +1,7 @@ { "name": "thingsboard-js-executor", "private": true, - "version": "4.2.0", + "version": "4.2.1", "description": "ThingsBoard JavaScript Executor Microservice", "main": "server.ts", "bin": "server.js", diff --git a/msa/js-executor/pom.xml b/msa/js-executor/pom.xml index 01ac469b39..651b142160 100644 --- a/msa/js-executor/pom.xml +++ b/msa/js-executor/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/monitoring/pom.xml b/msa/monitoring/pom.xml index 61f492dca8..abd12ed9e0 100644 --- a/msa/monitoring/pom.xml +++ b/msa/monitoring/pom.xml @@ -22,7 +22,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa diff --git a/msa/pom.xml b/msa/pom.xml index 58482a32cf..6fcf7c3523 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard msa diff --git a/msa/tb-node/pom.xml b/msa/tb-node/pom.xml index 5f53d868c9..0103bf9352 100644 --- a/msa/tb-node/pom.xml +++ b/msa/tb-node/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/tb/pom.xml b/msa/tb/pom.xml index 646b03bf14..28e1366a73 100644 --- a/msa/tb/pom.xml +++ b/msa/tb/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/transport/coap/pom.xml b/msa/transport/coap/pom.xml index 4f510f937b..8aefc94122 100644 --- a/msa/transport/coap/pom.xml +++ b/msa/transport/coap/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.msa - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.msa.transport diff --git a/msa/transport/http/pom.xml b/msa/transport/http/pom.xml index d9159cd0db..a1481203de 100644 --- a/msa/transport/http/pom.xml +++ b/msa/transport/http/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.msa - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.msa.transport diff --git a/msa/transport/lwm2m/pom.xml b/msa/transport/lwm2m/pom.xml index 9972d66a18..7af2b16588 100644 --- a/msa/transport/lwm2m/pom.xml +++ b/msa/transport/lwm2m/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.msa - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.msa.transport diff --git a/msa/transport/mqtt/pom.xml b/msa/transport/mqtt/pom.xml index 185addeb3b..176dca19eb 100644 --- a/msa/transport/mqtt/pom.xml +++ b/msa/transport/mqtt/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard.msa - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.msa.transport diff --git a/msa/transport/pom.xml b/msa/transport/pom.xml index 193d01ef06..afbab0c450 100644 --- a/msa/transport/pom.xml +++ b/msa/transport/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/transport/snmp/pom.xml b/msa/transport/snmp/pom.xml index ea3d6847c0..553ed8bb37 100644 --- a/msa/transport/snmp/pom.xml +++ b/msa/transport/snmp/pom.xml @@ -21,7 +21,7 @@ org.thingsboard.msa transport - 4.2.0-RC + 4.2.1-RC org.thingsboard.msa.transport diff --git a/msa/vc-executor-docker/pom.xml b/msa/vc-executor-docker/pom.xml index a77eb1d206..d0a291e246 100644 --- a/msa/vc-executor-docker/pom.xml +++ b/msa/vc-executor-docker/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/vc-executor/pom.xml b/msa/vc-executor/pom.xml index 0e91749cbe..cf777c4c96 100644 --- a/msa/vc-executor/pom.xml +++ b/msa/vc-executor/pom.xml @@ -21,7 +21,7 @@ org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/msa/web-ui/package.json b/msa/web-ui/package.json index cd5f87338b..2fcf6ab914 100644 --- a/msa/web-ui/package.json +++ b/msa/web-ui/package.json @@ -1,7 +1,7 @@ { "name": "thingsboard-web-ui", "private": true, - "version": "4.2.0", + "version": "4.2.1", "description": "ThingsBoard Web UI Microservice", "main": "server.ts", "bin": "server.js", diff --git a/msa/web-ui/pom.xml b/msa/web-ui/pom.xml index 8acd58b2b9..a56d0ca9d9 100644 --- a/msa/web-ui/pom.xml +++ b/msa/web-ui/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC msa org.thingsboard.msa diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml index bbcfcfea18..08f2a55f4a 100644 --- a/netty-mqtt/pom.xml +++ b/netty-mqtt/pom.xml @@ -19,11 +19,11 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard netty-mqtt - 4.2.0-RC + 4.2.1-RC jar Netty MQTT Client diff --git a/pom.xml b/pom.xml index fd49af548f..1283dcc01c 100755 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard thingsboard - 4.2.0-RC + 4.2.1-RC pom Thingsboard diff --git a/rest-client/pom.xml b/rest-client/pom.xml index 07499f1129..e6e82da954 100644 --- a/rest-client/pom.xml +++ b/rest-client/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard rest-client diff --git a/rule-engine/pom.xml b/rule-engine/pom.xml index 120d27ac89..8a9ad96d95 100644 --- a/rule-engine/pom.xml +++ b/rule-engine/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard rule-engine diff --git a/rule-engine/rule-engine-api/pom.xml b/rule-engine/rule-engine-api/pom.xml index a6ee3492e5..8f449a5e64 100644 --- a/rule-engine/rule-engine-api/pom.xml +++ b/rule-engine/rule-engine-api/pom.xml @@ -22,7 +22,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC rule-engine org.thingsboard.rule-engine diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index 3f2ee164df..4fc5a55c5e 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -22,7 +22,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC rule-engine org.thingsboard.rule-engine diff --git a/tools/pom.xml b/tools/pom.xml index 1224e73777..cce1466971 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard tools diff --git a/transport/coap/pom.xml b/transport/coap/pom.xml index c96f4fddf7..8ae86d381f 100644 --- a/transport/coap/pom.xml +++ b/transport/coap/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.transport diff --git a/transport/http/pom.xml b/transport/http/pom.xml index e32915c51e..487e1de186 100644 --- a/transport/http/pom.xml +++ b/transport/http/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.transport diff --git a/transport/lwm2m/pom.xml b/transport/lwm2m/pom.xml index af17bf703b..30effe6e42 100644 --- a/transport/lwm2m/pom.xml +++ b/transport/lwm2m/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.transport diff --git a/transport/mqtt/pom.xml b/transport/mqtt/pom.xml index 87c98f9d78..a609e8d002 100644 --- a/transport/mqtt/pom.xml +++ b/transport/mqtt/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC transport org.thingsboard.transport diff --git a/transport/pom.xml b/transport/pom.xml index 8ddcf992d8..c6a82349f5 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard transport diff --git a/transport/snmp/pom.xml b/transport/snmp/pom.xml index 09901661a6..1894f09787 100644 --- a/transport/snmp/pom.xml +++ b/transport/snmp/pom.xml @@ -21,7 +21,7 @@ org.thingsboard - 4.2.0-RC + 4.2.1-RC transport diff --git a/ui-ngx/package.json b/ui-ngx/package.json index 4ba512a024..ea7d7c80fb 100644 --- a/ui-ngx/package.json +++ b/ui-ngx/package.json @@ -1,6 +1,6 @@ { "name": "thingsboard", - "version": "4.2.0", + "version": "4.2.1", "scripts": { "ng": "ng", "start": "node --max_old_space_size=8048 ./node_modules/@angular/cli/bin/ng serve --configuration development --host 0.0.0.0 --open", diff --git a/ui-ngx/pom.xml b/ui-ngx/pom.xml index 2f5130014a..905a1a8aca 100644 --- a/ui-ngx/pom.xml +++ b/ui-ngx/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.thingsboard - 4.2.0-RC + 4.2.1-RC thingsboard org.thingsboard From ca8b110dc1c22eec9ae2d8c202da777c9a5990d4 Mon Sep 17 00:00:00 2001 From: VIacheslavKlimov Date: Thu, 9 Oct 2025 15:00:13 +0300 Subject: [PATCH 6/7] Cleanup upgrade from 4.2.0 to 4.2.1 --- .../main/data/upgrade/basic/schema_update.sql | 30 ------------------- .../DefaultDatabaseSchemaSettingsService.java | 2 +- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index add832ea6e..016e786776 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -14,33 +14,3 @@ -- limitations under the License. -- --- UPDATE OTA PACKAGE EXTERNAL ID START - -ALTER TABLE ota_package - ADD COLUMN IF NOT EXISTS external_id uuid; - -DO -$$ - BEGIN - IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'ota_package_external_id_unq_key') THEN - ALTER TABLE ota_package ADD CONSTRAINT ota_package_external_id_unq_key UNIQUE (tenant_id, external_id); - END IF; - END; -$$; - --- UPDATE OTA PACKAGE EXTERNAL ID END - --- DROP INDEXES THAT DUPLICATE UNIQUE CONSTRAINT START - -DROP INDEX IF EXISTS idx_device_external_id; -DROP INDEX IF EXISTS idx_device_profile_external_id; -DROP INDEX IF EXISTS idx_asset_external_id; -DROP INDEX IF EXISTS idx_entity_view_external_id; -DROP INDEX IF EXISTS idx_rule_chain_external_id; -DROP INDEX IF EXISTS idx_dashboard_external_id; -DROP INDEX IF EXISTS idx_customer_external_id; -DROP INDEX IF EXISTS idx_widgets_bundle_external_id; - --- DROP INDEXES THAT DUPLICATE UNIQUE CONSTRAINT END - -ALTER TABLE mobile_app ADD COLUMN IF NOT EXISTS title varchar(255); \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java index e5bd026fb7..f41a530630 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java @@ -32,7 +32,7 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti // This list should include all versions which are compatible for the upgrade. // The compatibility cycle usually breaks when we have some scripts written in Java that may not work after new release. - private static final List SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.1.0"); + private static final List SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.2.0"); private final ProjectInfo projectInfo; private final JdbcTemplate jdbcTemplate; From ad6ab9cc536537884724954b210ed355d9520ca7 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Thu, 9 Oct 2025 15:04:51 +0300 Subject: [PATCH 7/7] AI node: fixed resources validation --- .../server/actors/ruleChain/DefaultTbContext.java | 14 ++++++-------- .../org/thingsboard/rule/engine/api/TbContext.java | 2 +- .../org/thingsboard/rule/engine/ai/TbAiNode.java | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 88b04c7613..03830c1c4c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1063,18 +1063,16 @@ public class DefaultTbContext implements TbContext { @Override public void checkTenantEntity(EntityId entityId) throws TbNodeException { TenantId actualTenantId = TenantIdLoader.findTenantId(this, entityId); - assertSameTenantId(actualTenantId, entityId); + if (!getTenantId().equals(actualTenantId)) { + throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true); + } } @Override - public & HasTenantId, I extends EntityId> void checkTenantEntity(E entity) throws TbNodeException { + public & HasTenantId, I extends EntityId> void checkTenantOrSystemEntity(E entity) throws TbNodeException { TenantId actualTenantId = entity.getTenantId(); - assertSameTenantId(actualTenantId, entity.getId()); - } - - private void assertSameTenantId(TenantId tenantId, EntityId entityId) throws TbNodeException { - if (!getTenantId().equals(tenantId)) { - throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true); + if (!getTenantId().equals(actualTenantId) && !actualTenantId.isSysTenantId()) { + throw new TbNodeException("Entity with id: '" + entity.getId() + "' specified in the configuration doesn't belong to the current or system tenant.", true); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 920f00ed27..e703a7b256 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -255,7 +255,7 @@ public interface TbContext { void checkTenantEntity(EntityId entityId) throws TbNodeException; - & HasTenantId, I extends EntityId> void checkTenantEntity(E entity) throws TbNodeException; + & HasTenantId, I extends EntityId> void checkTenantOrSystemEntity(E entity) throws TbNodeException; boolean isLocalEntity(EntityId entityId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java index 054b06fb54..654ed3f448 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java @@ -261,7 +261,7 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode { if (!ResourceType.GENERAL.equals(resource.getResourceType())) { throw new TbNodeException("[" + ctx.getTenantId() + "] Resource with ID: [" + tbResourceId + "] has unsupported resource type: " + resource.getResourceType(), true); } - ctx.checkTenantEntity(resource); + ctx.checkTenantOrSystemEntity(resource); } private ListenableFuture> loadResources(TbContext ctx) {