diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index b61dc16384..ab7a50705e 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -407,6 +407,7 @@ public class ProtoUtils { .setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits()) .setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits()) .setOneway(msg.getMsg().isOneway()) + .setPersisted(msg.getMsg().isPersisted()) .build(); return TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder() diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index 87d466e11a..d137f8ed59 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -49,6 +49,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; @@ -57,6 +58,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.security.DeviceCredentials; @@ -64,6 +66,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static io.restassured.RestAssured.given; import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; @@ -320,6 +323,27 @@ public class TestRestClient { .as(JsonNode.class); } + public Rpc getPersistedRpc(RpcId rpcId) { + return given().spec(requestSpec) + .get("/api/rpc/persistent/{rpcId}", rpcId.toString()) + .then() + .statusCode(HTTP_OK) + .extract() + .as(Rpc.class); + } + + public PageData getPersistedRpcByDevice(DeviceId deviceId, PageLink pageLink) { + Map params = new HashMap<>(); + addPageLinkToParam(params, pageLink); + return given().spec(requestSpec).queryParams(params) + .get("/api/rpc/persistent/device/{deviceId}", deviceId.toString()) + .then() + .statusCode(HTTP_OK) + .extract() + .as(new TypeRef<>() { + }); + } + public PageData getDeviceProfiles(PageLink pageLink) { Map params = new HashMap<>(); addPageLinkToParam(params, pageLink); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 55198accce..c298d4fea3 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -46,9 +46,11 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -68,6 +70,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -76,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; import static org.thingsboard.server.common.data.DataConstants.DEVICE; import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; @@ -307,6 +311,60 @@ public class MqttClientTest extends AbstractContainerTest { assertThat(serverResponse).isEqualTo(mapper.readTree(clientResponse.toString())); } + @Test + public void serverSidePersistedRpc() throws Exception { + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); + + // Wait until subscription is processed + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); + + // Send an RPC from the server + JsonObject serverRpcPayload = new JsonObject(); + serverRpcPayload.addProperty("method", "getValue"); + serverRpcPayload.addProperty("params", true); + serverRpcPayload.addProperty("persistent", true); + + JsonNode persistentRpcId = testRestClient.postServerSideRpc(device.getId(), mapper.readTree(serverRpcPayload.toString())); + + assertNotNull(persistentRpcId); + + RpcId rpcId = new RpcId(UUID.fromString(persistentRpcId.get("rpcId").asText())); + + // Wait for RPC call from the server and send the response + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); + + assertThat(Objects.requireNonNull(requestFromServer).getMessage()).isEqualTo("{\"method\":\"getValue\",\"params\":true}"); + + Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length())); + JsonObject clientResponse = new JsonObject(); + clientResponse.addProperty("response", "someResponse"); + // Send a response to the server's RPC request + mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); + + PageLink pageLink = new PageLink(10); + + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5 * timeoutMultiplier, TimeUnit.SECONDS) + .until(() -> { + PageData rpcByDevice = testRestClient.getPersistedRpcByDevice(device.getId(), pageLink); + for (Rpc rpc : rpcByDevice.getData()) { + if (rpc.getId().equals(rpcId)) { + return true; + } + } + return false; + }); + + Rpc persistentRpc = testRestClient.getPersistedRpc(rpcId); + + assertThat(persistentRpc.getResponse()).isEqualTo(mapper.readTree(clientResponse.toString())); + } + @Test public void clientSideRpc() throws Exception { DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java index 40d899412d..7e25840e33 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java @@ -47,7 +47,7 @@ public class CertPemCredentials implements ClientCredentials { protected String caCert; private String cert; private String privateKey; - private String password = ""; + private String password; @Override public CredentialsType getType() { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java index db2a015f28..3df1f467b2 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java @@ -38,7 +38,6 @@ import static org.thingsboard.rule.engine.credentials.CertPemCredentials.PRIVATE public class CertPemCredentialsTest { private static final String PASS = "test"; - private static final String EMPTY_PASS = ""; private static final String RSA = "RSA"; private static final String EC = "EC"; @@ -81,10 +80,10 @@ public class CertPemCredentialsTest { private static Stream testLoadKeyStore() { return Stream.of( - Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", EMPTY_PASS, RSA), + Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", null, RSA), Arguments.of("pem/rsa_encrypted_cert.pem", "pem/rsa_encrypted_key.pem", PASS, RSA), Arguments.of("pem/rsa_encrypted_traditional_cert.pem", "pem/rsa_encrypted_traditional_key.pem", PASS, RSA), - Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", EMPTY_PASS, EC) + Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", null, EC) ); } @@ -98,7 +97,7 @@ public class CertPemCredentialsTest { certPemCredentials.setPassword(password); KeyStore keyStore = certPemCredentials.loadKeyStore(); Assertions.assertNotNull(keyStore); - Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, password.toCharArray()); + Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, SslUtil.getPassword(password)); Assertions.assertNotNull(key); Assertions.assertEquals(algorithm, key.getAlgorithm());