Merge remote-tracking branch 'origin/hotfix/3.6.1' into master-hotfix
This commit is contained in:
commit
adfa3d0b90
@ -407,6 +407,7 @@ public class ProtoUtils {
|
||||
.setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits())
|
||||
.setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits())
|
||||
.setOneway(msg.getMsg().isOneway())
|
||||
.setPersisted(msg.getMsg().isPersisted())
|
||||
.build();
|
||||
|
||||
return TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder()
|
||||
|
||||
@ -49,6 +49,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
@ -57,6 +58,7 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
@ -64,6 +66,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static io.restassured.RestAssured.given;
|
||||
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
||||
@ -320,6 +323,27 @@ public class TestRestClient {
|
||||
.as(JsonNode.class);
|
||||
}
|
||||
|
||||
public Rpc getPersistedRpc(RpcId rpcId) {
|
||||
return given().spec(requestSpec)
|
||||
.get("/api/rpc/persistent/{rpcId}", rpcId.toString())
|
||||
.then()
|
||||
.statusCode(HTTP_OK)
|
||||
.extract()
|
||||
.as(Rpc.class);
|
||||
}
|
||||
|
||||
public PageData<Rpc> getPersistedRpcByDevice(DeviceId deviceId, PageLink pageLink) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
addPageLinkToParam(params, pageLink);
|
||||
return given().spec(requestSpec).queryParams(params)
|
||||
.get("/api/rpc/persistent/device/{deviceId}", deviceId.toString())
|
||||
.then()
|
||||
.statusCode(HTTP_OK)
|
||||
.extract()
|
||||
.as(new TypeRef<>() {
|
||||
});
|
||||
}
|
||||
|
||||
public PageData<DeviceProfile> getDeviceProfiles(PageLink pageLink) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
addPageLinkToParam(params, pageLink);
|
||||
|
||||
@ -46,9 +46,11 @@ import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
@ -68,6 +70,7 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@ -76,6 +79,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.fail;
|
||||
import static org.thingsboard.server.common.data.DataConstants.DEVICE;
|
||||
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
|
||||
@ -307,6 +311,60 @@ public class MqttClientTest extends AbstractContainerTest {
|
||||
assertThat(serverResponse).isEqualTo(mapper.readTree(clientResponse.toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serverSidePersistedRpc() throws Exception {
|
||||
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
MqttMessageListener listener = new MqttMessageListener();
|
||||
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
|
||||
mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
|
||||
|
||||
// Wait until subscription is processed
|
||||
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
|
||||
|
||||
// Send an RPC from the server
|
||||
JsonObject serverRpcPayload = new JsonObject();
|
||||
serverRpcPayload.addProperty("method", "getValue");
|
||||
serverRpcPayload.addProperty("params", true);
|
||||
serverRpcPayload.addProperty("persistent", true);
|
||||
|
||||
JsonNode persistentRpcId = testRestClient.postServerSideRpc(device.getId(), mapper.readTree(serverRpcPayload.toString()));
|
||||
|
||||
assertNotNull(persistentRpcId);
|
||||
|
||||
RpcId rpcId = new RpcId(UUID.fromString(persistentRpcId.get("rpcId").asText()));
|
||||
|
||||
// Wait for RPC call from the server and send the response
|
||||
MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(Objects.requireNonNull(requestFromServer).getMessage()).isEqualTo("{\"method\":\"getValue\",\"params\":true}");
|
||||
|
||||
Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
|
||||
JsonObject clientResponse = new JsonObject();
|
||||
clientResponse.addProperty("response", "someResponse");
|
||||
// Send a response to the server's RPC request
|
||||
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
|
||||
|
||||
PageLink pageLink = new PageLink(10);
|
||||
|
||||
Awaitility.await()
|
||||
.pollInterval(500, TimeUnit.MILLISECONDS)
|
||||
.atMost(5 * timeoutMultiplier, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
PageData<Rpc> rpcByDevice = testRestClient.getPersistedRpcByDevice(device.getId(), pageLink);
|
||||
for (Rpc rpc : rpcByDevice.getData()) {
|
||||
if (rpc.getId().equals(rpcId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
Rpc persistentRpc = testRestClient.getPersistedRpc(rpcId);
|
||||
|
||||
assertThat(persistentRpc.getResponse()).isEqualTo(mapper.readTree(clientResponse.toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientSideRpc() throws Exception {
|
||||
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
@ -47,7 +47,7 @@ public class CertPemCredentials implements ClientCredentials {
|
||||
protected String caCert;
|
||||
private String cert;
|
||||
private String privateKey;
|
||||
private String password = "";
|
||||
private String password;
|
||||
|
||||
@Override
|
||||
public CredentialsType getType() {
|
||||
|
||||
@ -38,7 +38,6 @@ import static org.thingsboard.rule.engine.credentials.CertPemCredentials.PRIVATE
|
||||
public class CertPemCredentialsTest {
|
||||
|
||||
private static final String PASS = "test";
|
||||
private static final String EMPTY_PASS = "";
|
||||
private static final String RSA = "RSA";
|
||||
private static final String EC = "EC";
|
||||
|
||||
@ -81,10 +80,10 @@ public class CertPemCredentialsTest {
|
||||
|
||||
private static Stream<Arguments> testLoadKeyStore() {
|
||||
return Stream.of(
|
||||
Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", EMPTY_PASS, RSA),
|
||||
Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", null, RSA),
|
||||
Arguments.of("pem/rsa_encrypted_cert.pem", "pem/rsa_encrypted_key.pem", PASS, RSA),
|
||||
Arguments.of("pem/rsa_encrypted_traditional_cert.pem", "pem/rsa_encrypted_traditional_key.pem", PASS, RSA),
|
||||
Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", EMPTY_PASS, EC)
|
||||
Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", null, EC)
|
||||
);
|
||||
}
|
||||
|
||||
@ -98,7 +97,7 @@ public class CertPemCredentialsTest {
|
||||
certPemCredentials.setPassword(password);
|
||||
KeyStore keyStore = certPemCredentials.loadKeyStore();
|
||||
Assertions.assertNotNull(keyStore);
|
||||
Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, password.toCharArray());
|
||||
Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, SslUtil.getPassword(password));
|
||||
Assertions.assertNotNull(key);
|
||||
Assertions.assertEquals(algorithm, key.getAlgorithm());
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user