diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestCallback.java index 2dda86d7a4..07eadd731d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestCallback.java @@ -21,25 +21,14 @@ import org.eclipse.californium.core.CoapHandler; import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.CoAP; -import java.util.concurrent.CountDownLatch; - @Slf4j @Data public class CoapTestCallback implements CoapHandler { - protected final CountDownLatch latch; protected Integer observe; protected byte[] payloadBytes; protected CoAP.ResponseCode responseCode; - public CoapTestCallback() { - this.latch = new CountDownLatch(1); - } - - public CoapTestCallback(int subscribeCount) { - this.latch = new CountDownLatch(subscribeCount); - } - public Integer getObserve() { return observe; } @@ -57,7 +46,6 @@ public class CoapTestCallback implements CoapHandler { observe = response.getOptions().getObserve(); payloadBytes = response.getPayload(); responseCode = response.getCode(); - latch.countDown(); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java index 697779c178..03ef6f9503 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java @@ -232,7 +232,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap } client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES); - CoapTestCallback callbackCoap = new CoapTestCallback(1); + CoapTestCallback callbackCoap = new CoapTestCallback(); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)"; @@ -279,7 +279,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap } client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES); - CoapTestCallback callbackCoap = new CoapTestCallback(1); + CoapTestCallback callbackCoap = new CoapTestCallback(); String awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)"; CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java index a2b44b6e58..445ac51ccd 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java @@ -41,7 +41,6 @@ import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; import org.thingsboard.server.transport.coap.CoapTestCallback; import org.thingsboard.server.transport.coap.CoapTestClient; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; @@ -74,17 +73,16 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC protected void processOneWayRpcTest(boolean protobuf) throws Exception { client = new CoapTestClient(accessToken, FeatureType.RPC); - CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, true, protobuf); + CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, true, protobuf); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); String awaitAlias = "await One Way Rpc (client.getObserveRelation)"; await(awaitAlias) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && - callbackCoap.getObserve() != null && - 0 == callbackCoap.getObserve().intValue()); + callbackCoap.getObserve() != null && 0 == callbackCoap.getObserve()); validateCurrentStateNotification(callbackCoap); - int expectedObserveCountAfterGpioRequest = callbackCoap.getObserve().intValue() + 1; + int expectedObserveAfterRpcProcessed = callbackCoap.getObserve() + 1; String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); @@ -92,8 +90,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC await(awaitAlias) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && - callbackCoap.getObserve() != null && - expectedObserveCountAfterGpioRequest == callbackCoap.getObserve().intValue()); + callbackCoap.getObserve() != null && expectedObserveAfterRpcProcessed == callbackCoap.getObserve()); validateOneWayStateChangedNotification(callbackCoap, result); observeRelation.proactiveCancel(); @@ -102,7 +99,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC protected void processTwoWayRpcTest(String expectedResponseResult, boolean protobuf) throws Exception { client = new CoapTestClient(accessToken, FeatureType.RPC); - CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, false, protobuf); + CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, false, protobuf); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); String awaitAlias = "await Two Way Rpc (client.getObserveRelation)"; @@ -110,29 +107,29 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && callbackCoap.getObserve() != null && - 0 == callbackCoap.getObserve().intValue()); + 0 == callbackCoap.getObserve()); validateCurrentStateNotification(callbackCoap); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - int expectedObserveCountAfterGpioRequest1 = callbackCoap.getObserve().intValue() + 1; + int expectedObserveCountAfterGpioRequest1 = callbackCoap.getObserve() + 1; String actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first"; await(awaitAlias) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && callbackCoap.getObserve() != null && - expectedObserveCountAfterGpioRequest1 == callbackCoap.getObserve().intValue()); + expectedObserveCountAfterGpioRequest1 == callbackCoap.getObserve()); validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult); - int expectedObserveCountAfterGpioRequest2 = callbackCoap.getObserve().intValue() + 1; + int expectedObserveCountAfterGpioRequest2 = callbackCoap.getObserve() + 1; actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first"; await(awaitAlias) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && callbackCoap.getObserve() != null && - expectedObserveCountAfterGpioRequest2 == callbackCoap.getObserve().intValue()); + expectedObserveCountAfterGpioRequest2 == callbackCoap.getObserve()); validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult); @@ -140,24 +137,24 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC assertTrue(observeRelation.isCanceled()); } - protected void processOnLoadResponse(CoapResponse response, CoapTestClient client, Integer observe, CountDownLatch latch) { + protected void processOnLoadResponse(CoapResponse response, CoapTestClient client) { JsonNode responseJson = JacksonUtil.fromBytes(response.getPayload()); - client.setURI(CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.RPC, responseJson.get("id").asInt())); + int requestId = responseJson.get("id").asInt(); + client.setURI(CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.RPC, requestId)); client.postMethod(new CoapHandler() { @Override public void onLoad(CoapResponse response) { - log.warn("Command Response Ack: {}, {}", response.getCode(), response.getResponseText()); - latch.countDown(); + log.warn("RPC {} command response ack: {}", requestId, response.getCode()); } @Override public void onError() { - log.warn("Command Response Ack Error, No connect"); + log.warn("RPC {} command response ack error, no connect", requestId); } }, DEVICE_RESPONSE, MediaTypeRegistry.APPLICATION_JSON); } - protected void processOnLoadProtoResponse(CoapResponse response, CoapTestClient client, Integer observe, CountDownLatch latch) { + protected void processOnLoadProtoResponse(CoapResponse response, CoapTestClient client) { ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration(); ProtoFileElement rpcRequestProtoFileElement = DynamicProtoUtils.getProtoFileElement(protoTransportPayloadConfiguration.getDeviceRpcRequestProtoSchema()); DynamicSchema rpcRequestProtoSchema = DynamicProtoUtils.getDynamicSchema(rpcRequestProtoFileElement, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA); @@ -180,13 +177,12 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC client.postMethod(new CoapHandler() { @Override public void onLoad(CoapResponse response) { - log.warn("Command Response Ack: {}", response.getCode()); - latch.countDown(); + log.warn("RPC {} command response ack: {}", requestId, response.getCode()); } @Override public void onError() { - log.warn("Command Response Ack Error, No connect"); + log.warn("RPC {} command response ack error, no connect", requestId); } }, rpcResponseMsg.toByteArray(), MediaTypeRegistry.APPLICATION_JSON); } catch (InvalidProtocolBufferException e) { @@ -226,8 +222,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC private final boolean isOneWayRpc; private final boolean protobuf; - TestCoapCallbackForRPC(CoapTestClient client, int subscribeCount, boolean isOneWayRpc, boolean protobuf) { - super(subscribeCount); + TestCoapCallbackForRPC(CoapTestClient client, boolean isOneWayRpc, boolean protobuf) { this.client = client; this.isOneWayRpc = isOneWayRpc; this.protobuf = protobuf; @@ -241,12 +236,10 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC if (observe != null) { if (!isOneWayRpc && observe > 0) { if (!protobuf){ - processOnLoadResponse(response, client, observe, latch); + processOnLoadResponse(response, client); } else { - processOnLoadProtoResponse(response, client, observe, latch); + processOnLoadProtoResponse(response, client); } - } else { - latch.countDown(); } } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java index 3240647956..208189ac4e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java @@ -32,7 +32,6 @@ public class MqttTestCallback implements MqttCallback { protected final CountDownLatch deliveryLatch; protected int qoS; protected byte[] payloadBytes; - protected String awaitSubTopic; protected boolean pubAckReceived; public MqttTestCallback() { @@ -45,12 +44,6 @@ public class MqttTestCallback implements MqttCallback { this.deliveryLatch = new CountDownLatch(1); } - public MqttTestCallback(String awaitSubTopic) { - this.subscribeLatch = new CountDownLatch(1); - this.deliveryLatch = new CountDownLatch(1); - this.awaitSubTopic = awaitSubTopic; - } - @Override public void connectionLost(Throwable throwable) { log.warn("connectionLost: ", throwable); @@ -59,23 +52,10 @@ public class MqttTestCallback implements MqttCallback { @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { - if (awaitSubTopic == null) { - log.warn("messageArrived on topic: {}", requestTopic); - qoS = mqttMessage.getQos(); - payloadBytes = mqttMessage.getPayload(); - subscribeLatch.countDown(); - } else { - messageArrivedOnAwaitSubTopic(requestTopic, mqttMessage); - } - } - - protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { - log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); - if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); - payloadBytes = mqttMessage.getPayload(); - subscribeLatch.countDown(); - } + log.warn("messageArrived on topic: {}", requestTopic); + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java new file mode 100644 index 0000000000..0f3cc14629 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Data +@Slf4j +@EqualsAndHashCode(callSuper = true) +public class MqttTestSubscribeOnTopicCallback extends MqttTestCallback { + + protected final String awaitSubTopic; + + public MqttTestSubscribeOnTopicCallback(String awaitSubTopic) { + super(); + this.awaitSubTopic = awaitSubTopic; + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + if (awaitSubTopic.equals(requestTopic)) { + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); + } + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index 941e97e500..71d8809e38 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -45,6 +45,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import java.util.ArrayList; @@ -358,7 +359,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt String update = getWsClient().waitForUpdate(); assertThat(update).as("ws update received").isNotBlank(); - MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(attrSubTopic.replace("+", "1")); client.setCallback(callback); String payloadStr = "{\"clientKeys\":\"" + clientKeysStr + "\", \"sharedKeys\":\"" + sharedKeysStr + "\"}"; client.publishAndWait(attrReqTopicPrefix + "1", payloadStr.getBytes()); @@ -389,7 +390,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt String update = getWsClient().waitForUpdate(); assertThat(update).as("ws update received").isNotBlank(); - MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(attrSubTopic.replace("+", "1")); client.setCallback(callback); TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder(); attributesRequestBuilder.setClientKeys(clientKeysStr); @@ -448,14 +449,14 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); //RequestAttributes does not make any subscriptions in device actor - MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); + MqttTestCallback clientAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); String csKeysStr = "[\"clientStr\", \"clientBool\", \"clientDbl\", \"clientLong\", \"clientJson\"]"; String csRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": true, \"keys\": " + csKeysStr + "}"; client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, csRequestPayloadStr.getBytes()); validateJsonResponseGateway(clientAttributesCallback, deviceName, CLIENT_ATTRIBUTES_PAYLOAD); - MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); + MqttTestCallback sharedAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(sharedAttributesCallback); String shKeysStr = "[\"sharedStr\", \"sharedBool\", \"sharedDbl\", \"sharedLong\", \"sharedJson\"]"; String shRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": false, \"keys\": " + shKeysStr + "}"; @@ -502,13 +503,13 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1); - MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); + MqttTestCallback clientAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, clientKeysList, true); client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray()); validateProtoClientResponseGateway(clientAttributesCallback, deviceName); - MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); + MqttTestCallback sharedAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(sharedAttributesCallback); gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, sharedKeysList, false); client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionJsonDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionJsonDeviceTest.java index 9081baa420..de0a3d6f3c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionJsonDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionJsonDeviceTest.java @@ -35,6 +35,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import java.util.concurrent.TimeUnit; @@ -270,7 +271,7 @@ public class MqttProvisionJsonDeviceTest extends AbstractMqttIntegrationTest { String provisionRequestMsg = createTestProvisionMessage(deviceCredentials); MqttTestClient client = new MqttTestClient(); client.connectAndWait("provision"); - MqttTestCallback onProvisionCallback = new MqttTestCallback(DEVICE_PROVISION_RESPONSE_TOPIC); + MqttTestCallback onProvisionCallback = new MqttTestSubscribeOnTopicCallback(DEVICE_PROVISION_RESPONSE_TOPIC); client.setCallback(onProvisionCallback); client.subscribe(DEVICE_PROVISION_RESPONSE_TOPIC, MqttQoS.AT_MOST_ONCE); client.publishAndWait(DEVICE_PROVISION_REQUEST_TOPIC, provisionRequestMsg.getBytes()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionProtoDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionProtoDeviceTest.java index fcea0f249b..c6d013ba20 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionProtoDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/provision/MqttProvisionProtoDeviceTest.java @@ -43,6 +43,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import java.util.concurrent.TimeUnit; @@ -269,7 +270,7 @@ public class MqttProvisionProtoDeviceTest extends AbstractMqttIntegrationTest { protected byte[] createMqttClientAndPublish(byte[] provisionRequestMsg) throws Exception { MqttTestClient client = new MqttTestClient(); client.connectAndWait("provision"); - MqttTestCallback onProvisionCallback = new MqttTestCallback(DEVICE_PROVISION_RESPONSE_TOPIC); + MqttTestCallback onProvisionCallback = new MqttTestSubscribeOnTopicCallback(DEVICE_PROVISION_RESPONSE_TOPIC); client.setCallback(onProvisionCallback); client.subscribe(DEVICE_PROVISION_RESPONSE_TOPIC, MqttQoS.AT_MOST_ONCE); client.publishAndWait(DEVICE_PROVISION_REQUEST_TOPIC, provisionRequestMsg); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 8537ee9fd8..e1f20a3d95 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import java.util.ArrayList; @@ -81,7 +82,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM protected void processOneWayRpcTest(String rpcSubTopic) throws Exception { MqttTestClient client = new MqttTestClient(); client.connectAndWait(accessToken); - MqttTestCallback callback = new MqttTestCallback(rpcSubTopic.replace("+", "0")); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(rpcSubTopic.replace("+", "0")); client.setCallback(callback); subscribeAndWait(client, rpcSubTopic, savedDevice.getId(), FeatureType.RPC); @@ -221,7 +222,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - MqttTestCallback callback = new MqttTestCallback(GATEWAY_RPC_TOPIC); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC); client.setCallback(callback); subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC); @@ -320,7 +321,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM } } - protected class MqttTestRpcJsonCallback extends MqttTestCallback { + protected class MqttTestRpcJsonCallback extends MqttTestSubscribeOnTopicCallback { private final MqttTestClient client; @@ -330,7 +331,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM } @Override - protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { qoS = mqttMessage.getQos(); @@ -349,9 +350,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM subscribeLatch.countDown(); } } + } - protected class MqttTestRpcProtoCallback extends MqttTestCallback { + protected class MqttTestRpcProtoCallback extends MqttTestSubscribeOnTopicCallback { private final MqttTestClient client; @@ -361,7 +363,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM } @Override - protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { qoS = mqttMessage.getQos(); @@ -380,6 +382,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM subscribeLatch.countDown(); } } + } protected byte[] processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException { @@ -446,7 +449,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { - log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); + log.warn("messageArrived on topic: {}", requestTopic); expected.add(new String(mqttMessage.getPayload())); String responseTopic = requestTopic.replace("request", "response"); qoS = mqttMessage.getQos(); diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index 953b7094a4..14db2eea7b 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -28,6 +28,12 @@ + + + + + + diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index 3caaba0ba9..f70967b72d 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -561,18 +561,19 @@ public class DefaultCoapClientContext implements CoapClientContext { @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { - log.trace("[{}] Received RPC command to device", sessionId); + DeviceId deviceId = state.getDeviceId(); + log.trace("[{}][{}] Received RPC command to device: {}", deviceId, sessionId, msg); if (!isDownlinkAllowed(state)) { - log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId()); + log.trace("[{}][{}] ignore downlink request cause client is sleeping.", deviceId, sessionId); return; } boolean sent = false; String error = null; boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc()); + int requestId = getNextMsgId(); try { Response response = state.getAdaptor().convertToPublish(msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder()); response.setConfirmable(conRequest); - int requestId = getNextMsgId(); response.setMID(requestId); if (conRequest) { PowerMode powerMode = state.getPowerMode(); @@ -591,6 +592,7 @@ public class DefaultCoapClientContext implements CoapClientContext { transportContext.getScheduler().schedule(() -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId); if (rpcRequestMsg != null) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update due to server timeout ...", deviceId, sessionId, requestId); transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); @@ -598,11 +600,13 @@ public class DefaultCoapClientContext implements CoapClientContext { response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); if (rpcRequestMsg != null) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request DELIVERED status update ...", deviceId, sessionId, requestId); transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY); } }, id -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); if (rpcRequestMsg != null) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update ...", deviceId, sessionId, requestId); transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } })); @@ -626,8 +630,10 @@ public class DefaultCoapClientContext implements CoapClientContext { .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY); } else if (sent) { if (!conRequest) { + log.trace("[{}][{}][{}] Going to send to device actor non-confirmable RPC request DELIVERED status update ...", deviceId, sessionId, requestId); transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } else if (msg.getPersisted()) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request SENT status update ...", deviceId, sessionId, requestId); transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 621504564f..6267ae9424 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1273,11 +1273,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); + int requestId = rpcRequest.getRequestId(); if (isAckExpected(payload)) { rpcAwaitingAck.put(msgId, rpcRequest); context.getScheduler().schedule(() -> { TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId); if (msg != null) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId); transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); @@ -1286,18 +1288,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement cf.addListener(result -> { Throwable throwable = result.cause(); if (throwable != null) { - log.trace("[{}][{}][{}] Failed send RPC request to device due to: ", deviceSessionCtx.getDeviceId(), sessionId, rpcRequest.getRequestId(), throwable); - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + log.trace("[{}][{}][{}] Failed send RPC request to device due to: ", deviceSessionCtx.getDeviceId(), sessionId, requestId, throwable); + this.sendErrorRpcResponse(sessionInfo, requestId, ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); return; } if (!isAckExpected(payload)) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request DELIVERED status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId); transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } else if (rpcRequest.getPersisted()) { + log.trace("[{}][{}][{}] Going to send to device actor RPC request SENT status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId); transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); } if (sparkplugSessionHandler != null) { - this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); + this.sendSuccessRpcResponse(sessionInfo, requestId, ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); } }); }