From b6bd4247da0ba3145c9bb45f1e79c93abd4a2f55 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Fri, 21 Jan 2022 13:13:08 +0200 Subject: [PATCH] Fix for PROD-1337 --- .../callback/AbstractSyncSessionCallback.java | 6 ++++++ .../transport/http/DeviceApiController.java | 7 +++++++ .../transport/mqtt/MqttTransportHandler.java | 9 +++++++++ .../mqtt/session/GatewayDeviceSessionCtx.java | 6 ++++++ .../mqtt/session/GatewaySessionHandler.java | 4 ++++ .../common/transport/SessionMsgListener.java | 4 ++-- .../server/msa/connectivity/MqttClientTest.java | 15 +++++++++++++++ .../msa/connectivity/MqttGatewayClientTest.java | 10 +++++++++- 8 files changed, 58 insertions(+), 3 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java index 1097d5dd1a..03de2d0ac9 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java @@ -21,6 +21,7 @@ import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.CoapExchange; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.client.TbCoapClientState; @@ -52,6 +53,11 @@ public abstract class AbstractSyncSessionCallback implements SessionMsgListener } + @Override + public void onDeviceDeleted(DeviceId deviceId) { + + } + @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) { logUnsupportedCommandMessage(toDeviceRequest); diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 06169a040d..de9619cb9f 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -603,6 +603,13 @@ public class DeviceApiController implements TbTransportService { responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } + @Override + public void onDeviceDeleted(DeviceId deviceId) { + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + log.trace("[{}] Received device deleted notification for device with id: {}",sessionId, deviceId); + responseWriter.setResult(new ResponseEntity<>("Device was deleted!", HttpStatus.FORBIDDEN)); + } + } private static MediaType parseMediaType(String contentType) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 7f27c91f1c..2f52923e63 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; @@ -928,6 +929,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionHandler.onGatewayDisconnect(); } deviceSessionCtx.setDisconnected(); + context.onAuthFailure(address); + ChannelHandlerContext ctx = deviceSessionCtx.getChannel(); + ctx.close(); } deviceSessionCtx.release(); } @@ -1066,4 +1070,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); } + @Override + public void onDeviceDeleted(DeviceId deviceId) { + doDisconnect(); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index b01e4e120e..debdab87b5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture; import io.netty.handler.codec.mqtt.MqttMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; @@ -136,6 +137,11 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple // This feature is not supported in the TB IoT Gateway yet. } + @Override + public void onDeviceDeleted(DeviceId deviceId) { + parent.onDeviceDeleted(this.getSessionInfo().getDeviceName()); + } + private boolean isAckExpected(MqttMessage message) { return message.fixedHeader().qosLevel().value() > 0; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 7fc48d922a..d91935086b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -178,6 +178,10 @@ public class GatewaySessionHandler { devices.forEach(this::deregisterSession); } + public void onDeviceDeleted(String deviceName) { + deregisterSession(deviceName); + } + public String getNodeId() { return context.getNodeId(); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java index f0c8152979..27ee61c700 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java @@ -45,6 +45,8 @@ public interface SessionMsgListener { void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); + void onDeviceDeleted(DeviceId deviceId); + default void onUplinkNotification(UplinkNotificationMsg notificationMsg){}; default void onToTransportUpdateCredentials(ToTransportUpdateCredentialsProto toTransportUpdateCredentials){} @@ -54,8 +56,6 @@ public interface SessionMsgListener { default void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) {} - default void onDeviceDeleted(DeviceId deviceId) {} - default void onResourceUpdate(TransportProtos.ResourceUpdateMsg resourceUpdateMsgOpt) {} default void onResourceDelete(TransportProtos.ResourceDeleteMsg resourceUpdateMsgOpt) {} diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index efbb3b1068..336d75bb45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -339,6 +339,21 @@ public class MqttClientTest extends AbstractContainerTest { restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId()); } + @Test + public void deviceDeletedClosingSession() throws Exception { + restClient.login("tenant@thingsboard.org", "tenant"); + String deviceForDeletingTestName = "Device for deleting notification test"; + Device device = createDevice(deviceForDeletingTestName); + DeviceCredentials deviceCredentials = restClient.getDeviceCredentialsByDeviceId(device.getId()).get(); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + + restClient.deleteDevice(device.getId()); + TimeUnit.SECONDS.sleep(3); + Assert.assertFalse(mqttClient.isConnected()); + } + private RuleChainId createRootRuleChainForRpcResponse() throws Exception { RuleChain newRuleChain = new RuleChain(); newRuleChain.setName("testRuleChain"); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index def2be56b2..2e106815c7 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -304,7 +304,15 @@ public class MqttGatewayClientTest extends AbstractContainerTest { Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); } - private void checkAttribute(boolean client, String expectedValue) throws Exception{ + @Test + public void deviceCreationAfterDeleted() throws Exception { + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + this.createdDevice.getId()); + Optional deletedDevice = restClient.getDeviceById(this.createdDevice.getId()); + Assert.assertTrue(deletedDevice.isEmpty()); + this.createdDevice = createDeviceThroughGateway(mqttClient, gatewayDevice); + } + + private void checkAttribute(boolean client, String expectedValue) throws Exception { JsonObject gatewayAttributesRequest = new JsonObject(); int messageId = new Random().nextInt(100); gatewayAttributesRequest.addProperty("id", messageId);