added more logs to RPC processing logic in MQTT and CoAP transports
This commit is contained in:
		
							parent
							
								
									d500d5cb17
								
							
						
					
					
						commit
						138128d837
					
				@ -21,25 +21,14 @@ import org.eclipse.californium.core.CoapHandler;
 | 
			
		||||
import org.eclipse.californium.core.CoapResponse;
 | 
			
		||||
import org.eclipse.californium.core.coap.CoAP;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.CountDownLatch;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Data
 | 
			
		||||
public class CoapTestCallback implements CoapHandler {
 | 
			
		||||
 | 
			
		||||
    protected final CountDownLatch latch;
 | 
			
		||||
    protected Integer observe;
 | 
			
		||||
    protected byte[] payloadBytes;
 | 
			
		||||
    protected CoAP.ResponseCode responseCode;
 | 
			
		||||
 | 
			
		||||
    public CoapTestCallback() {
 | 
			
		||||
        this.latch = new CountDownLatch(1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public CoapTestCallback(int subscribeCount) {
 | 
			
		||||
        this.latch = new CountDownLatch(subscribeCount);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public Integer getObserve() {
 | 
			
		||||
        return observe;
 | 
			
		||||
    }
 | 
			
		||||
@ -57,7 +46,6 @@ public class CoapTestCallback implements CoapHandler {
 | 
			
		||||
        observe = response.getOptions().getObserve();
 | 
			
		||||
        payloadBytes = response.getPayload();
 | 
			
		||||
        responseCode = response.getCode();
 | 
			
		||||
        latch.countDown();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -232,7 +232,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES);
 | 
			
		||||
        CoapTestCallback callbackCoap = new CoapTestCallback(1);
 | 
			
		||||
        CoapTestCallback callbackCoap = new CoapTestCallback();
 | 
			
		||||
 | 
			
		||||
        CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap);
 | 
			
		||||
        String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)";
 | 
			
		||||
@ -279,7 +279,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES);
 | 
			
		||||
        CoapTestCallback callbackCoap = new CoapTestCallback(1);
 | 
			
		||||
        CoapTestCallback callbackCoap = new CoapTestCallback();
 | 
			
		||||
 | 
			
		||||
        String awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)";
 | 
			
		||||
        CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap);
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,6 @@ import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest;
 | 
			
		||||
import org.thingsboard.server.transport.coap.CoapTestCallback;
 | 
			
		||||
import org.thingsboard.server.transport.coap.CoapTestClient;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.CountDownLatch;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
@ -74,17 +73,16 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
 | 
			
		||||
    protected void processOneWayRpcTest(boolean protobuf) throws Exception {
 | 
			
		||||
        client = new CoapTestClient(accessToken, FeatureType.RPC);
 | 
			
		||||
        CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, true, protobuf);
 | 
			
		||||
        CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, true, protobuf);
 | 
			
		||||
 | 
			
		||||
        CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap);
 | 
			
		||||
        String awaitAlias = "await One Way Rpc (client.getObserveRelation)";
 | 
			
		||||
        await(awaitAlias)
 | 
			
		||||
                .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) &&
 | 
			
		||||
                        callbackCoap.getObserve() != null &&
 | 
			
		||||
                        0 == callbackCoap.getObserve().intValue());
 | 
			
		||||
                        callbackCoap.getObserve() != null && 0 == callbackCoap.getObserve());
 | 
			
		||||
        validateCurrentStateNotification(callbackCoap);
 | 
			
		||||
        int expectedObserveCountAfterGpioRequest = callbackCoap.getObserve().intValue() + 1;
 | 
			
		||||
        int expectedObserveAfterRpcProcessed = callbackCoap.getObserve() + 1;
 | 
			
		||||
        String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
 | 
			
		||||
        String deviceId = savedDevice.getId().getId().toString();
 | 
			
		||||
        String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
 | 
			
		||||
@ -92,8 +90,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
        await(awaitAlias)
 | 
			
		||||
                .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) &&
 | 
			
		||||
                        callbackCoap.getObserve() != null &&
 | 
			
		||||
                        expectedObserveCountAfterGpioRequest == callbackCoap.getObserve().intValue());
 | 
			
		||||
                        callbackCoap.getObserve() != null && expectedObserveAfterRpcProcessed == callbackCoap.getObserve());
 | 
			
		||||
        validateOneWayStateChangedNotification(callbackCoap, result);
 | 
			
		||||
 | 
			
		||||
        observeRelation.proactiveCancel();
 | 
			
		||||
@ -102,7 +99,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
 | 
			
		||||
    protected void processTwoWayRpcTest(String expectedResponseResult, boolean protobuf) throws Exception {
 | 
			
		||||
        client = new CoapTestClient(accessToken, FeatureType.RPC);
 | 
			
		||||
        CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, false, protobuf);
 | 
			
		||||
        CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, false, protobuf);
 | 
			
		||||
 | 
			
		||||
        CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap);
 | 
			
		||||
        String awaitAlias = "await Two Way Rpc (client.getObserveRelation)";
 | 
			
		||||
@ -110,29 +107,29 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
                .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) &&
 | 
			
		||||
                        callbackCoap.getObserve() != null &&
 | 
			
		||||
                        0 == callbackCoap.getObserve().intValue());
 | 
			
		||||
                        0 == callbackCoap.getObserve());
 | 
			
		||||
        validateCurrentStateNotification(callbackCoap);
 | 
			
		||||
 | 
			
		||||
        String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
 | 
			
		||||
        String deviceId = savedDevice.getId().getId().toString();
 | 
			
		||||
        int expectedObserveCountAfterGpioRequest1 = callbackCoap.getObserve().intValue() + 1;
 | 
			
		||||
        int expectedObserveCountAfterGpioRequest1 = callbackCoap.getObserve() + 1;
 | 
			
		||||
        String actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
 | 
			
		||||
        awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first";
 | 
			
		||||
        await(awaitAlias)
 | 
			
		||||
                .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) &&
 | 
			
		||||
                        callbackCoap.getObserve() != null &&
 | 
			
		||||
                        expectedObserveCountAfterGpioRequest1 == callbackCoap.getObserve().intValue());
 | 
			
		||||
                        expectedObserveCountAfterGpioRequest1 == callbackCoap.getObserve());
 | 
			
		||||
        validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult);
 | 
			
		||||
 | 
			
		||||
        int expectedObserveCountAfterGpioRequest2 = callbackCoap.getObserve().intValue() + 1;
 | 
			
		||||
        int expectedObserveCountAfterGpioRequest2 = callbackCoap.getObserve() + 1;
 | 
			
		||||
        actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
 | 
			
		||||
        awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first";
 | 
			
		||||
        await(awaitAlias)
 | 
			
		||||
                .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) &&
 | 
			
		||||
                        callbackCoap.getObserve() != null &&
 | 
			
		||||
                        expectedObserveCountAfterGpioRequest2 == callbackCoap.getObserve().intValue());
 | 
			
		||||
                        expectedObserveCountAfterGpioRequest2 == callbackCoap.getObserve());
 | 
			
		||||
 | 
			
		||||
        validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult);
 | 
			
		||||
 | 
			
		||||
@ -140,24 +137,24 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
        assertTrue(observeRelation.isCanceled());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void processOnLoadResponse(CoapResponse response, CoapTestClient client, Integer observe, CountDownLatch latch) {
 | 
			
		||||
    protected void processOnLoadResponse(CoapResponse response, CoapTestClient client) {
 | 
			
		||||
        JsonNode responseJson = JacksonUtil.fromBytes(response.getPayload());
 | 
			
		||||
        client.setURI(CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.RPC, responseJson.get("id").asInt()));
 | 
			
		||||
        int requestId = responseJson.get("id").asInt();
 | 
			
		||||
        client.setURI(CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.RPC, requestId));
 | 
			
		||||
        client.postMethod(new CoapHandler() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onLoad(CoapResponse response) {
 | 
			
		||||
                log.warn("Command Response Ack: {}, {}", response.getCode(), response.getResponseText());
 | 
			
		||||
                latch.countDown();
 | 
			
		||||
                log.warn("RPC {} command response ack: {}", requestId, response.getCode());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onError() {
 | 
			
		||||
                log.warn("Command Response Ack Error, No connect");
 | 
			
		||||
                log.warn("RPC {} command response ack error, no connect", requestId);
 | 
			
		||||
            }
 | 
			
		||||
        }, DEVICE_RESPONSE, MediaTypeRegistry.APPLICATION_JSON);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void processOnLoadProtoResponse(CoapResponse response, CoapTestClient client, Integer observe, CountDownLatch latch) {
 | 
			
		||||
    protected void processOnLoadProtoResponse(CoapResponse response, CoapTestClient client) {
 | 
			
		||||
        ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration();
 | 
			
		||||
        ProtoFileElement rpcRequestProtoFileElement = DynamicProtoUtils.getProtoFileElement(protoTransportPayloadConfiguration.getDeviceRpcRequestProtoSchema());
 | 
			
		||||
        DynamicSchema rpcRequestProtoSchema = DynamicProtoUtils.getDynamicSchema(rpcRequestProtoFileElement, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA);
 | 
			
		||||
@ -180,13 +177,12 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
            client.postMethod(new CoapHandler() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onLoad(CoapResponse response) {
 | 
			
		||||
                    log.warn("Command Response Ack: {}", response.getCode());
 | 
			
		||||
                    latch.countDown();
 | 
			
		||||
                    log.warn("RPC {} command response ack: {}", requestId, response.getCode());
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onError() {
 | 
			
		||||
                    log.warn("Command Response Ack Error, No connect");
 | 
			
		||||
                    log.warn("RPC {} command response ack error, no connect", requestId);
 | 
			
		||||
                }
 | 
			
		||||
            }, rpcResponseMsg.toByteArray(), MediaTypeRegistry.APPLICATION_JSON);
 | 
			
		||||
        } catch (InvalidProtocolBufferException e) {
 | 
			
		||||
@ -226,8 +222,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
        private final boolean isOneWayRpc;
 | 
			
		||||
        private final boolean protobuf;
 | 
			
		||||
 | 
			
		||||
        TestCoapCallbackForRPC(CoapTestClient client, int subscribeCount, boolean isOneWayRpc, boolean protobuf) {
 | 
			
		||||
            super(subscribeCount);
 | 
			
		||||
        TestCoapCallbackForRPC(CoapTestClient client, boolean isOneWayRpc, boolean protobuf) {
 | 
			
		||||
            this.client = client;
 | 
			
		||||
            this.isOneWayRpc = isOneWayRpc;
 | 
			
		||||
            this.protobuf = protobuf;
 | 
			
		||||
@ -241,12 +236,10 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
 | 
			
		||||
            if (observe != null) {
 | 
			
		||||
                if (!isOneWayRpc && observe > 0) {
 | 
			
		||||
                    if (!protobuf){
 | 
			
		||||
                        processOnLoadResponse(response, client, observe, latch);
 | 
			
		||||
                        processOnLoadResponse(response, client);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        processOnLoadProtoResponse(response, client, observe, latch);
 | 
			
		||||
                        processOnLoadProtoResponse(response, client);
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    latch.countDown();
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,6 @@ public class MqttTestCallback implements MqttCallback {
 | 
			
		||||
    protected final CountDownLatch deliveryLatch;
 | 
			
		||||
    protected int qoS;
 | 
			
		||||
    protected byte[] payloadBytes;
 | 
			
		||||
    protected String awaitSubTopic;
 | 
			
		||||
    protected boolean pubAckReceived;
 | 
			
		||||
 | 
			
		||||
    public MqttTestCallback() {
 | 
			
		||||
@ -45,12 +44,6 @@ public class MqttTestCallback implements MqttCallback {
 | 
			
		||||
        this.deliveryLatch = new CountDownLatch(1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public MqttTestCallback(String awaitSubTopic) {
 | 
			
		||||
        this.subscribeLatch = new CountDownLatch(1);
 | 
			
		||||
        this.deliveryLatch = new CountDownLatch(1);
 | 
			
		||||
        this.awaitSubTopic = awaitSubTopic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void connectionLost(Throwable throwable) {
 | 
			
		||||
        log.warn("connectionLost: ", throwable);
 | 
			
		||||
@ -59,23 +52,10 @@ public class MqttTestCallback implements MqttCallback {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
        if (awaitSubTopic == null) {
 | 
			
		||||
            log.warn("messageArrived on topic: {}", requestTopic);
 | 
			
		||||
            qoS = mqttMessage.getQos();
 | 
			
		||||
            payloadBytes = mqttMessage.getPayload();
 | 
			
		||||
            subscribeLatch.countDown();
 | 
			
		||||
        } else {
 | 
			
		||||
            messageArrivedOnAwaitSubTopic(requestTopic, mqttMessage);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
        log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
 | 
			
		||||
        if (awaitSubTopic.equals(requestTopic)) {
 | 
			
		||||
            qoS = mqttMessage.getQos();
 | 
			
		||||
            payloadBytes = mqttMessage.getPayload();
 | 
			
		||||
            subscribeLatch.countDown();
 | 
			
		||||
        }
 | 
			
		||||
        log.warn("messageArrived on topic: {}", requestTopic);
 | 
			
		||||
        qoS = mqttMessage.getQos();
 | 
			
		||||
        payloadBytes = mqttMessage.getPayload();
 | 
			
		||||
        subscribeLatch.countDown();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,45 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.transport.mqtt.mqttv3;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Slf4j
 | 
			
		||||
@EqualsAndHashCode(callSuper = true)
 | 
			
		||||
public class MqttTestSubscribeOnTopicCallback extends MqttTestCallback {
 | 
			
		||||
 | 
			
		||||
    protected final String awaitSubTopic;
 | 
			
		||||
 | 
			
		||||
    public MqttTestSubscribeOnTopicCallback(String awaitSubTopic) {
 | 
			
		||||
        super();
 | 
			
		||||
        this.awaitSubTopic = awaitSubTopic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
        log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
 | 
			
		||||
        if (awaitSubTopic.equals(requestTopic)) {
 | 
			
		||||
            qoS = mqttMessage.getQos();
 | 
			
		||||
            payloadBytes = mqttMessage.getPayload();
 | 
			
		||||
            subscribeLatch.countDown();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -45,6 +45,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
@ -358,7 +359,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
 | 
			
		||||
 | 
			
		||||
        String update = getWsClient().waitForUpdate();
 | 
			
		||||
        assertThat(update).as("ws update received").isNotBlank();
 | 
			
		||||
        MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
 | 
			
		||||
        MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(attrSubTopic.replace("+", "1"));
 | 
			
		||||
        client.setCallback(callback);
 | 
			
		||||
        String payloadStr = "{\"clientKeys\":\"" + clientKeysStr + "\", \"sharedKeys\":\"" + sharedKeysStr + "\"}";
 | 
			
		||||
        client.publishAndWait(attrReqTopicPrefix + "1", payloadStr.getBytes());
 | 
			
		||||
@ -389,7 +390,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
 | 
			
		||||
 | 
			
		||||
        String update = getWsClient().waitForUpdate();
 | 
			
		||||
        assertThat(update).as("ws update received").isNotBlank();
 | 
			
		||||
        MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
 | 
			
		||||
        MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(attrSubTopic.replace("+", "1"));
 | 
			
		||||
        client.setCallback(callback);
 | 
			
		||||
        TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder();
 | 
			
		||||
        attributesRequestBuilder.setClientKeys(clientKeysStr);
 | 
			
		||||
@ -448,14 +449,14 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
 | 
			
		||||
        client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
 | 
			
		||||
        //RequestAttributes does not make any subscriptions in device actor
 | 
			
		||||
 | 
			
		||||
        MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback clientAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(clientAttributesCallback);
 | 
			
		||||
        String csKeysStr = "[\"clientStr\", \"clientBool\", \"clientDbl\", \"clientLong\", \"clientJson\"]";
 | 
			
		||||
        String csRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": true, \"keys\": " + csKeysStr + "}";
 | 
			
		||||
        client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, csRequestPayloadStr.getBytes());
 | 
			
		||||
        validateJsonResponseGateway(clientAttributesCallback, deviceName, CLIENT_ATTRIBUTES_PAYLOAD);
 | 
			
		||||
 | 
			
		||||
        MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback sharedAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(sharedAttributesCallback);
 | 
			
		||||
        String shKeysStr = "[\"sharedStr\", \"sharedBool\", \"sharedDbl\", \"sharedLong\", \"sharedJson\"]";
 | 
			
		||||
        String shRequestPayloadStr = "{\"id\": 1, \"device\": \"" + deviceName + "\", \"client\": false, \"keys\": " + shKeysStr + "}";
 | 
			
		||||
@ -502,13 +503,13 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
 | 
			
		||||
        client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
 | 
			
		||||
        awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1);
 | 
			
		||||
 | 
			
		||||
        MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback clientAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(clientAttributesCallback);
 | 
			
		||||
        TransportApiProtos.GatewayAttributesRequestMsg gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, clientKeysList, true);
 | 
			
		||||
        client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray());
 | 
			
		||||
        validateProtoClientResponseGateway(clientAttributesCallback, deviceName);
 | 
			
		||||
 | 
			
		||||
        MqttTestCallback sharedAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback sharedAttributesCallback = new MqttTestSubscribeOnTopicCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(sharedAttributesCallback);
 | 
			
		||||
        gatewayAttributesRequestMsg = getGatewayAttributesRequestMsg(deviceName, sharedKeysList, false);
 | 
			
		||||
        client.publishAndWait(GATEWAY_ATTRIBUTES_REQUEST_TOPIC, gatewayAttributesRequestMsg.toByteArray());
 | 
			
		||||
 | 
			
		||||
@ -35,6 +35,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
@ -270,7 +271,7 @@ public class MqttProvisionJsonDeviceTest extends AbstractMqttIntegrationTest {
 | 
			
		||||
        String provisionRequestMsg = createTestProvisionMessage(deviceCredentials);
 | 
			
		||||
        MqttTestClient client = new MqttTestClient();
 | 
			
		||||
        client.connectAndWait("provision");
 | 
			
		||||
        MqttTestCallback onProvisionCallback = new MqttTestCallback(DEVICE_PROVISION_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback onProvisionCallback = new MqttTestSubscribeOnTopicCallback(DEVICE_PROVISION_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(onProvisionCallback);
 | 
			
		||||
        client.subscribe(DEVICE_PROVISION_RESPONSE_TOPIC, MqttQoS.AT_MOST_ONCE);
 | 
			
		||||
        client.publishAndWait(DEVICE_PROVISION_REQUEST_TOPIC, provisionRequestMsg.getBytes());
 | 
			
		||||
 | 
			
		||||
@ -43,6 +43,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
@ -269,7 +270,7 @@ public class MqttProvisionProtoDeviceTest extends AbstractMqttIntegrationTest {
 | 
			
		||||
    protected byte[] createMqttClientAndPublish(byte[] provisionRequestMsg) throws Exception {
 | 
			
		||||
        MqttTestClient client = new MqttTestClient();
 | 
			
		||||
        client.connectAndWait("provision");
 | 
			
		||||
        MqttTestCallback onProvisionCallback = new MqttTestCallback(DEVICE_PROVISION_RESPONSE_TOPIC);
 | 
			
		||||
        MqttTestCallback onProvisionCallback = new MqttTestSubscribeOnTopicCallback(DEVICE_PROVISION_RESPONSE_TOPIC);
 | 
			
		||||
        client.setCallback(onProvisionCallback);
 | 
			
		||||
        client.subscribe(DEVICE_PROVISION_RESPONSE_TOPIC, MqttQoS.AT_MOST_ONCE);
 | 
			
		||||
        client.publishAndWait(DEVICE_PROVISION_REQUEST_TOPIC, provisionRequestMsg);
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ import org.thingsboard.server.common.msg.session.FeatureType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportApiProtos;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
@ -81,7 +82,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
    protected void processOneWayRpcTest(String rpcSubTopic) throws Exception {
 | 
			
		||||
        MqttTestClient client = new MqttTestClient();
 | 
			
		||||
        client.connectAndWait(accessToken);
 | 
			
		||||
        MqttTestCallback callback = new MqttTestCallback(rpcSubTopic.replace("+", "0"));
 | 
			
		||||
        MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(rpcSubTopic.replace("+", "0"));
 | 
			
		||||
        client.setCallback(callback);
 | 
			
		||||
        subscribeAndWait(client, rpcSubTopic, savedDevice.getId(), FeatureType.RPC);
 | 
			
		||||
 | 
			
		||||
@ -221,7 +222,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
        );
 | 
			
		||||
        assertNotNull(savedDevice);
 | 
			
		||||
 | 
			
		||||
        MqttTestCallback  callback = new MqttTestCallback(GATEWAY_RPC_TOPIC);
 | 
			
		||||
        MqttTestCallback  callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC);
 | 
			
		||||
        client.setCallback(callback);
 | 
			
		||||
        subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC);
 | 
			
		||||
 | 
			
		||||
@ -320,7 +321,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected class MqttTestRpcJsonCallback extends MqttTestCallback {
 | 
			
		||||
    protected class MqttTestRpcJsonCallback extends MqttTestSubscribeOnTopicCallback {
 | 
			
		||||
 | 
			
		||||
        private final MqttTestClient client;
 | 
			
		||||
 | 
			
		||||
@ -330,7 +331,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
        public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
            log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
 | 
			
		||||
            if (awaitSubTopic.equals(requestTopic)) {
 | 
			
		||||
                qoS = mqttMessage.getQos();
 | 
			
		||||
@ -349,9 +350,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
                subscribeLatch.countDown();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected class MqttTestRpcProtoCallback extends MqttTestCallback {
 | 
			
		||||
    protected class MqttTestRpcProtoCallback extends MqttTestSubscribeOnTopicCallback {
 | 
			
		||||
 | 
			
		||||
        private final MqttTestClient client;
 | 
			
		||||
 | 
			
		||||
@ -361,7 +363,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
        public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
            log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
 | 
			
		||||
            if (awaitSubTopic.equals(requestTopic)) {
 | 
			
		||||
                qoS = mqttMessage.getQos();
 | 
			
		||||
@ -380,6 +382,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
                subscribeLatch.countDown();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected byte[] processProtoMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
 | 
			
		||||
@ -446,7 +449,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
 | 
			
		||||
            log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
 | 
			
		||||
            log.warn("messageArrived on topic: {}", requestTopic);
 | 
			
		||||
            expected.add(new String(mqttMessage.getPayload()));
 | 
			
		||||
            String responseTopic = requestTopic.replace("request", "response");
 | 
			
		||||
            qoS = mqttMessage.getQos();
 | 
			
		||||
 | 
			
		||||
@ -28,6 +28,12 @@
 | 
			
		||||
    <logger name="org.thingsboard.server.transport.lwm2m.server" level="INFO"/>
 | 
			
		||||
    <logger name="org.eclipse.californium.core" level="INFO"/>
 | 
			
		||||
 | 
			
		||||
    <!-- Coap client context debug for the test scope -->
 | 
			
		||||
    <!--    <logger name="org.thingsboard.server.transport.coap.client.DefaultCoapClientContext" level="TRACE" />-->
 | 
			
		||||
 | 
			
		||||
    <!-- Device actor message processor debug for the test scope -->
 | 
			
		||||
    <!--    <logger name="org.thingsboard.server.actors.device.DeviceActorMessageProcessor" level="DEBUG" />-->
 | 
			
		||||
 | 
			
		||||
    <root level="WARN">
 | 
			
		||||
        <appender-ref ref="console"/>
 | 
			
		||||
    </root>
 | 
			
		||||
 | 
			
		||||
@ -561,18 +561,19 @@ public class DefaultCoapClientContext implements CoapClientContext {
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
 | 
			
		||||
            log.trace("[{}] Received RPC command to device", sessionId);
 | 
			
		||||
            DeviceId deviceId = state.getDeviceId();
 | 
			
		||||
            log.trace("[{}][{}] Received RPC command to device: {}", deviceId, sessionId, msg);
 | 
			
		||||
            if (!isDownlinkAllowed(state)) {
 | 
			
		||||
                log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId());
 | 
			
		||||
                log.trace("[{}][{}] ignore downlink request cause client is sleeping.", deviceId, sessionId);
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            boolean sent = false;
 | 
			
		||||
            String error = null;
 | 
			
		||||
            boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc());
 | 
			
		||||
            int requestId = getNextMsgId();
 | 
			
		||||
            try {
 | 
			
		||||
                Response response = state.getAdaptor().convertToPublish(msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
 | 
			
		||||
                response.setConfirmable(conRequest);
 | 
			
		||||
                int requestId = getNextMsgId();
 | 
			
		||||
                response.setMID(requestId);
 | 
			
		||||
                if (conRequest) {
 | 
			
		||||
                    PowerMode powerMode = state.getPowerMode();
 | 
			
		||||
@ -591,6 +592,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
 | 
			
		||||
                    transportContext.getScheduler().schedule(() -> {
 | 
			
		||||
                        TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId);
 | 
			
		||||
                        if (rpcRequestMsg != null) {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update due to server timeout ...", deviceId, sessionId, requestId);
 | 
			
		||||
                            transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
 | 
			
		||||
                        }
 | 
			
		||||
                    }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
 | 
			
		||||
@ -598,11 +600,13 @@ public class DefaultCoapClientContext implements CoapClientContext {
 | 
			
		||||
                    response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
 | 
			
		||||
                        TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
 | 
			
		||||
                        if (rpcRequestMsg != null) {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Going to send to device actor RPC request DELIVERED status update ...", deviceId, sessionId, requestId);
 | 
			
		||||
                            transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
 | 
			
		||||
                        }
 | 
			
		||||
                    }, id -> {
 | 
			
		||||
                        TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
 | 
			
		||||
                        if (rpcRequestMsg != null) {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update ...", deviceId, sessionId, requestId);
 | 
			
		||||
                            transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
 | 
			
		||||
                        }
 | 
			
		||||
                    }));
 | 
			
		||||
@ -626,8 +630,10 @@ public class DefaultCoapClientContext implements CoapClientContext {
 | 
			
		||||
                                    .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
 | 
			
		||||
                } else if (sent) {
 | 
			
		||||
                    if (!conRequest) {
 | 
			
		||||
                        log.trace("[{}][{}][{}] Going to send to device actor non-confirmable RPC request DELIVERED status update ...", deviceId, sessionId, requestId);
 | 
			
		||||
                        transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
 | 
			
		||||
                    } else if (msg.getPersisted()) {
 | 
			
		||||
                        log.trace("[{}][{}][{}] Going to send to device actor RPC request SENT status update ...", deviceId, sessionId, requestId);
 | 
			
		||||
                        transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
@ -1273,11 +1273,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
 | 
			
		||||
    public void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
 | 
			
		||||
        int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
 | 
			
		||||
        int requestId = rpcRequest.getRequestId();
 | 
			
		||||
        if (isAckExpected(payload)) {
 | 
			
		||||
            rpcAwaitingAck.put(msgId, rpcRequest);
 | 
			
		||||
            context.getScheduler().schedule(() -> {
 | 
			
		||||
                TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
 | 
			
		||||
                if (msg != null) {
 | 
			
		||||
                    log.trace("[{}][{}][{}] Going to send to device actor RPC request TIMEOUT status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId);
 | 
			
		||||
                    transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
 | 
			
		||||
                }
 | 
			
		||||
            }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
 | 
			
		||||
@ -1286,18 +1288,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        cf.addListener(result -> {
 | 
			
		||||
            Throwable throwable = result.cause();
 | 
			
		||||
            if (throwable != null) {
 | 
			
		||||
                log.trace("[{}][{}][{}] Failed send RPC request to device due to: ", deviceSessionCtx.getDeviceId(), sessionId, rpcRequest.getRequestId(), throwable);
 | 
			
		||||
                this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
 | 
			
		||||
                log.trace("[{}][{}][{}] Failed send RPC request to device due to: ", deviceSessionCtx.getDeviceId(), sessionId, requestId, throwable);
 | 
			
		||||
                this.sendErrorRpcResponse(sessionInfo, requestId,
 | 
			
		||||
                        ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName());
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            if (!isAckExpected(payload)) {
 | 
			
		||||
                log.trace("[{}][{}][{}] Going to send to device actor RPC request DELIVERED status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId);
 | 
			
		||||
                transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
 | 
			
		||||
            } else if (rpcRequest.getPersisted()) {
 | 
			
		||||
                log.trace("[{}][{}][{}] Going to send to device actor RPC request SENT status update ...", deviceSessionCtx.getDeviceId(), sessionId, requestId);
 | 
			
		||||
                transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
 | 
			
		||||
            }
 | 
			
		||||
            if (sparkplugSessionHandler != null) {
 | 
			
		||||
                this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
 | 
			
		||||
                this.sendSuccessRpcResponse(sessionInfo, requestId, ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user