From 9553a6958b313ae85a463609e25d88c6232e963c Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Nov 2023 16:46:35 +0100 Subject: [PATCH 01/14] fixed persistent RPC for microservice deployment --- .../java/org/thingsboard/server/service/queue/ProtoUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java index 36038568bc..b61bd9b67e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java @@ -308,6 +308,7 @@ public class ProtoUtils { .setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits()) .setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits()) .setOneway(msg.getMsg().isOneway()) + .setPersisted(msg.getMsg().isPersisted()) .build(); return TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder() From 07721da41d620b8e07fecb5d01285d11fc9d06ca Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Nov 2023 18:06:01 +0100 Subject: [PATCH 02/14] added blackbox test for persisted RPC --- .../server/msa/TestRestClient.java | 24 ++++++++ .../msa/connectivity/MqttClientTest.java | 59 +++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index f04331e01e..3d5e71ae2e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -44,12 +44,14 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.security.DeviceCredentials; @@ -57,6 +59,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static io.restassured.RestAssured.given; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; @@ -302,6 +305,27 @@ public class TestRestClient { .as(JsonNode.class); } + public Rpc getPersistedRpc(RpcId rpcId) { + return given().spec(requestSpec) + .get("/api/rpc/persistent/{rpcId}", rpcId.toString()) + .then() + .statusCode(HTTP_OK) + .extract() + .as(Rpc.class); + } + + public PageData getPersistedRpcByDevice(DeviceId deviceId, PageLink pageLink) { + Map params = new HashMap<>(); + addPageLinkToParam(params, pageLink); + return given().spec(requestSpec).queryParams(params) + .get("/api/rpc/persistent/device/{deviceId}", deviceId.toString()) + .then() + .statusCode(HTTP_OK) + .extract() + .as(new TypeRef<>() { + }); + } + public PageData getDeviceProfiles(PageLink pageLink) { Map params = new HashMap<>(); addPageLinkToParam(params, pageLink); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 5a7e9b631d..75414477ab 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -26,6 +26,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -39,9 +40,11 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -59,6 +62,7 @@ import java.util.Arrays; import java.util.Objects; import java.util.Optional; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -66,6 +70,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; import static org.thingsboard.server.common.data.DataConstants.DEVICE; import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; @@ -293,6 +298,60 @@ public class MqttClientTest extends AbstractContainerTest { assertThat(serverResponse).isEqualTo(mapper.readTree(clientResponse.toString())); } + @Test + public void serverSidePersistedRpc() throws Exception { + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener); + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); + + // Wait until subscription is processed + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); + + // Send an RPC from the server + JsonObject serverRpcPayload = new JsonObject(); + serverRpcPayload.addProperty("method", "getValue"); + serverRpcPayload.addProperty("params", true); + serverRpcPayload.addProperty("persistent", true); + + JsonNode persistentRpcId = testRestClient.postServerSideRpc(device.getId(), mapper.readTree(serverRpcPayload.toString())); + + assertNotNull(persistentRpcId); + + RpcId rpcId = new RpcId(UUID.fromString(persistentRpcId.get("rpcId").asText())); + + // Wait for RPC call from the server and send the response + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); + + assertThat(Objects.requireNonNull(requestFromServer).getMessage()).isEqualTo("{\"method\":\"getValue\",\"params\":true}"); + + Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length())); + JsonObject clientResponse = new JsonObject(); + clientResponse.addProperty("response", "someResponse"); + // Send a response to the server's RPC request + mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); + + PageLink pageLink = new PageLink(10); + + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5 * timeoutMultiplier, TimeUnit.SECONDS) + .until(() -> { + PageData rpcByDevice = testRestClient.getPersistedRpcByDevice(device.getId(), pageLink); + for (Rpc rpc : rpcByDevice.getData()) { + if (rpc.getId().equals(rpcId)) { + return true; + } + } + return false; + }); + + Rpc persistentRpc = testRestClient.getPersistedRpc(rpcId); + + assertThat(persistentRpc.getResponse()).isEqualTo(mapper.readTree(clientResponse.toString())); + } + @Test public void clientSideRpc() throws Exception { DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); From 3477cc1b79cdf864b845d6ba340f99efcbaf39de Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 30 Nov 2023 11:23:14 +0100 Subject: [PATCH 03/14] fixed possible NPE --- .../src/main/java/org/thingsboard/common/util/SslUtil.java | 6 +++++- .../rule/engine/credentials/CertPemCredentials.java | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/common/util/src/main/java/org/thingsboard/common/util/SslUtil.java b/common/util/src/main/java/org/thingsboard/common/util/SslUtil.java index 889436671a..32ab61a9a1 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/SslUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/SslUtil.java @@ -73,7 +73,7 @@ public class SslUtil { @SneakyThrows public static PrivateKey readPrivateKey(String fileContent, String passStr) { - char[] password = StringUtils.isEmpty(passStr) ? EMPTY_PASS : passStr.toCharArray(); + char[] password = getPassword(passStr); PrivateKey privateKey = null; JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter(); @@ -102,4 +102,8 @@ public class SslUtil { return privateKey; } + public static char[] getPassword(String passStr) { + return StringUtils.isEmpty(passStr) ? EMPTY_PASS : passStr.toCharArray(); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java index 3824e71459..2c545f80a7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/credentials/CertPemCredentials.java @@ -47,7 +47,7 @@ public class CertPemCredentials implements ClientCredentials { protected String caCert; private String cert; private String privateKey; - private String password = ""; + private String password; @Override public CredentialsType getType() { @@ -87,7 +87,7 @@ public class CertPemCredentials implements ClientCredentials { private KeyManagerFactory createAndInitKeyManagerFactory() throws Exception { KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(loadKeyStore(), password.toCharArray()); + kmf.init(loadKeyStore(), SslUtil.getPassword(password)); return kmf; } @@ -107,7 +107,7 @@ public class CertPemCredentials implements ClientCredentials { CertPath certPath = factory.generateCertPath(certificates); List path = certPath.getCertificates(); Certificate[] x509Certificates = path.toArray(new Certificate[0]); - keyStore.setKeyEntry(PRIVATE_KEY_ALIAS, privateKey, password.toCharArray(), x509Certificates); + keyStore.setKeyEntry(PRIVATE_KEY_ALIAS, privateKey, SslUtil.getPassword(password), x509Certificates); } return keyStore; } From 37a8d78e0161c7cdd5a23975d118875269231ef8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 30 Nov 2023 11:44:04 +0100 Subject: [PATCH 04/14] tests improvements --- .../rule/engine/credentials/CertPemCredentialsTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java index 2cecd1490d..3c7d8a30c4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/credentials/CertPemCredentialsTest.java @@ -39,7 +39,6 @@ import static org.thingsboard.rule.engine.credentials.CertPemCredentials.PRIVATE public class CertPemCredentialsTest { private static final String PASS = "test"; - private static final String EMPTY_PASS = ""; private static final String RSA = "RSA"; private static final String EC = "EC"; @@ -82,10 +81,10 @@ public class CertPemCredentialsTest { private static Stream testLoadKeyStore() { return Stream.of( - Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", EMPTY_PASS, RSA), + Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", null, RSA), Arguments.of("pem/rsa_encrypted_cert.pem", "pem/rsa_encrypted_key.pem", PASS, RSA), Arguments.of("pem/rsa_encrypted_traditional_cert.pem", "pem/rsa_encrypted_traditional_key.pem", PASS, RSA), - Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", EMPTY_PASS, EC) + Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", null, EC) ); } @@ -99,7 +98,7 @@ public class CertPemCredentialsTest { certPemCredentials.setPassword(password); KeyStore keyStore = certPemCredentials.loadKeyStore(); Assertions.assertNotNull(keyStore); - Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, password.toCharArray()); + Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, SslUtil.getPassword(password)); Assertions.assertNotNull(key); Assertions.assertEquals(algorithm, key.getAlgorithm()); From 4285f89665c2aa83e047926f293e89f61218d1b2 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Thu, 23 Nov 2023 12:22:15 +0200 Subject: [PATCH 05/14] extended widgetTypes access to customer user authority --- .../server/controller/WidgetTypeController.java | 4 ++-- .../controller/WidgetTypeControllerTest.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/WidgetTypeController.java b/application/src/main/java/org/thingsboard/server/controller/WidgetTypeController.java index f90a196edf..0109a327e8 100644 --- a/application/src/main/java/org/thingsboard/server/controller/WidgetTypeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/WidgetTypeController.java @@ -216,7 +216,7 @@ public class WidgetTypeController extends AutoCommitController { @ApiOperation(value = "Get all Widget types for specified Bundle (getBundleWidgetTypes)", notes = "Returns an array of Widget Type objects that belong to specified Widget Bundle." + WIDGET_TYPE_DESCRIPTION + " " + SYSTEM_OR_TENANT_AUTHORITY_PARAGRAPH) - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/widgetTypes", params = {"widgetsBundleId"}, method = RequestMethod.GET) @ResponseBody public List getBundleWidgetTypes( @@ -248,7 +248,7 @@ public class WidgetTypeController extends AutoCommitController { @ApiOperation(value = "Get all Widget types details for specified Bundle (getBundleWidgetTypes)", notes = "Returns an array of Widget Type Details objects that belong to specified Widget Bundle." + WIDGET_TYPE_DETAILS_DESCRIPTION + " " + SYSTEM_OR_TENANT_AUTHORITY_PARAGRAPH) - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/widgetTypesDetails", params = {"widgetsBundleId"}, method = RequestMethod.GET) @ResponseBody public List getBundleWidgetTypesDetails( diff --git a/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java index 9cc9fd0596..553b0d6490 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java @@ -190,6 +190,20 @@ public class WidgetTypeControllerTest extends AbstractControllerTest { Collections.sort(loadedWidgetTypes, idComparator); Assert.assertEquals(widgetTypes, loadedWidgetTypes); + + loginCustomerUser(); + + List loadedWidgetTypes2 = doGetTyped("/api/widgetTypes?widgetsBundleId={widgetsBundleId}", + new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); + Collections.sort(loadedWidgetTypes2, idComparator); + Assert.assertEquals(widgetTypes, loadedWidgetTypes2); + + List loadedWidgetTypes3 = doGetTyped("/api/widgetTypesDetails?widgetsBundleId={widgetsBundleId}", + new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); + List widgetTypes3 = loadedWidgetTypes3.stream().map(WidgetType::new).collect(Collectors.toList()); + Collections.sort(widgetTypes3, idComparator); + Assert.assertEquals(widgetTypes3, loadedWidgetTypes); + } @Test From d156c246c4fee85f3c6f56b4db8922fd85cb28ca Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 27 Nov 2023 11:43:18 +0200 Subject: [PATCH 06/14] updated test to cover sysadmin authority --- .../controller/WidgetTypeControllerTest.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java index 553b0d6490..e515250321 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WidgetTypeControllerTest.java @@ -193,17 +193,29 @@ public class WidgetTypeControllerTest extends AbstractControllerTest { loginCustomerUser(); - List loadedWidgetTypes2 = doGetTyped("/api/widgetTypes?widgetsBundleId={widgetsBundleId}", + List loadedWidgetTypesCustomer = doGetTyped("/api/widgetTypes?widgetsBundleId={widgetsBundleId}", new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); - Collections.sort(loadedWidgetTypes2, idComparator); - Assert.assertEquals(widgetTypes, loadedWidgetTypes2); + Collections.sort(loadedWidgetTypesCustomer, idComparator); + Assert.assertEquals(widgetTypes, loadedWidgetTypesCustomer); - List loadedWidgetTypes3 = doGetTyped("/api/widgetTypesDetails?widgetsBundleId={widgetsBundleId}", + List customerLoadedWidgetTypesDetails = doGetTyped("/api/widgetTypesDetails?widgetsBundleId={widgetsBundleId}", new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); - List widgetTypes3 = loadedWidgetTypes3.stream().map(WidgetType::new).collect(Collectors.toList()); - Collections.sort(widgetTypes3, idComparator); - Assert.assertEquals(widgetTypes3, loadedWidgetTypes); + List widgetTypesFromDetailsListCustomer = customerLoadedWidgetTypesDetails.stream().map(WidgetType::new).collect(Collectors.toList()); + Collections.sort(widgetTypesFromDetailsListCustomer, idComparator); + Assert.assertEquals(widgetTypesFromDetailsListCustomer, loadedWidgetTypes); + loginSysAdmin(); + + List sysAdminLoadedWidgetTypes = doGetTyped("/api/widgetTypes?widgetsBundleId={widgetsBundleId}", + new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); + Collections.sort(sysAdminLoadedWidgetTypes, idComparator); + Assert.assertEquals(widgetTypes, sysAdminLoadedWidgetTypes); + + List sysAdminLoadedWidgetTypesDetails = doGetTyped("/api/widgetTypesDetails?widgetsBundleId={widgetsBundleId}", + new TypeReference<>(){}, widgetsBundle.getId().getId().toString()); + List widgetTypesFromDetailsListSysAdmin = sysAdminLoadedWidgetTypesDetails.stream().map(WidgetType::new).collect(Collectors.toList()); + Collections.sort(widgetTypesFromDetailsListSysAdmin, idComparator); + Assert.assertEquals(widgetTypesFromDetailsListSysAdmin, loadedWidgetTypes); } @Test From f00a9d0ebef7da303c39a027ae4fda7f9fd49d47 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 17 Sep 2024 12:37:01 +0200 Subject: [PATCH 07/14] fixed RuleEngine OOM --- .../actors/ruleChain/RuleChainManagerActor.java | 2 +- .../server/actors/shared/RuleChainErrorActor.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index f2213c02d0..066f552a50 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -95,7 +95,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { () -> { RuleChain ruleChain = provider.apply(ruleChainId); if (ruleChain == null) { - return new RuleChainErrorActor.ActorCreator(systemContext, tenantId, + return new RuleChainErrorActor.ActorCreator(systemContext, tenantId, ruleChainId, new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!")); } else { return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java index 4a5e5c6405..cdb3f721f8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java @@ -19,16 +19,15 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActor; import org.thingsboard.server.actors.TbActorId; -import org.thingsboard.server.actors.TbStringActorId; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; -import java.util.UUID; - @Slf4j public class RuleChainErrorActor extends ContextAwareActor { @@ -43,9 +42,8 @@ public class RuleChainErrorActor extends ContextAwareActor { @Override protected boolean doProcess(TbActorMsg msg) { - if (msg instanceof RuleChainAwareMsg) { + if (msg instanceof RuleChainAwareMsg rcMsg) { log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg); - var rcMsg = (RuleChainAwareMsg) msg; rcMsg.getMsg().getCallback().onFailure(error); return true; } else { @@ -56,17 +54,19 @@ public class RuleChainErrorActor extends ContextAwareActor { public static class ActorCreator extends ContextBasedCreator { private final TenantId tenantId; + private final RuleChainId ruleChainId; private final RuleEngineException error; - public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) { + public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleEngineException error) { super(context); this.tenantId = tenantId; + this.ruleChainId = ruleChainId; this.error = error; } @Override public TbActorId createActorId() { - return new TbStringActorId(UUID.randomUUID().toString()); + return new TbEntityActorId(ruleChainId); } @Override From 8b5a8eee716d84159be77bcb65dd7ea1f5ea8225 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 17 Sep 2024 17:52:26 +0200 Subject: [PATCH 08/14] added corresponding tests --- .../server/actors/tenant/TenantActorTest.java | 91 +++++++++++++++++-- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java index a282452f4e..cad933ccc5 100644 --- a/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java @@ -18,57 +18,130 @@ package org.thingsboard.server.actors.tenant; import org.junit.Before; import org.junit.Test; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.DefaultTbActorSystem; import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorMailbox; import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbActorSystem; +import org.thingsboard.server.actors.TbActorSystemSettings; +import org.thingsboard.server.actors.TbEntityActorId; +import org.thingsboard.server.actors.ruleChain.RuleChainActor; +import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg; +import org.thingsboard.server.actors.shared.RuleChainErrorActor; +import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg; +import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.thingsboard.server.actors.service.DefaultActorService.RULE_DISPATCHER_NAME; public class TenantActorTest { TenantActor tenantActor; - TbActorCtx ctx; ActorSystemContext systemContext; + RuleChainService ruleChainService; + PartitionService partitionService; TenantId tenantId = TenantId.SYS_TENANT_ID; DeviceId deviceId = DeviceId.fromString("78bf9b26-74ef-4af2-9cfb-ad6cf24ad2ec"); + RuleChainId ruleChainId = new RuleChainId(UUID.fromString("48cfa2b0-3dca-11ef-8d1a-37c2894cc59c")); @Before public void setUp() throws Exception { systemContext = mock(ActorSystemContext.class); - ctx = mock(TbActorCtx.class); + ruleChainService = mock(RuleChainService.class); + partitionService = mock(); + + TbServiceInfoProvider serviceInfoProvider = mock(TbServiceInfoProvider.class); + TbApiUsageStateService apiUsageService = mock(TbApiUsageStateService.class); + TenantService tenantService = mock(TenantService.class); + + when(systemContext.getRuleChainService()).thenReturn(ruleChainService); tenantActor = (TenantActor) new TenantActor.ActorCreator(systemContext, tenantId).createActor(); - when(systemContext.getTenantService()).thenReturn(mock(TenantService.class)); - tenantActor.init(ctx); - tenantActor.cantFindTenant = false; + + when(tenantService.findTenantById(tenantId)).thenReturn(mock()); + when(systemContext.getTenantService()).thenReturn(tenantService); + when(serviceInfoProvider.isService(ServiceType.TB_CORE)).thenReturn(true); + when(serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)).thenReturn(true); + when(systemContext.getServiceInfoProvider()).thenReturn(serviceInfoProvider); + when(partitionService.isManagedByCurrentService(tenantId)).thenReturn(true); + when(systemContext.getPartitionService()).thenReturn(partitionService); + when(systemContext.getApiUsageStateService()).thenReturn(apiUsageService); + when(apiUsageService.getApiUsageState(tenantId)).thenReturn(new ApiUsageState()); } @Test - public void deleteDeviceTest() { + public void deleteDeviceTest() throws Exception { + TbActorCtx ctx = mock(TbActorCtx.class); + tenantActor.init(ctx); TbActorRef deviceActorRef = mock(TbActorRef.class); - when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0,true)); + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0, true)); when(ctx.getOrCreateChildActor(any(), any(), any(), any())).thenReturn(deviceActorRef); + ComponentLifecycleMsg componentLifecycleMsg = new ComponentLifecycleMsg(tenantId, deviceId, ComponentLifecycleEvent.DELETED); tenantActor.doProcess(componentLifecycleMsg); verify(deviceActorRef).tellWithHighPriority(eq(new DeviceDeleteMsg(tenantId, deviceId))); - reset(ctx, deviceActorRef); - when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1,false)); + + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1, false)); tenantActor.doProcess(componentLifecycleMsg); verify(ctx, never()).getOrCreateChildActor(any(), any(), any(), any()); verify(deviceActorRef, never()).tellWithHighPriority(any()); } + @Test + public void ruleChainErrorActorTest() throws Exception { + TbActorSystemSettings settings = new TbActorSystemSettings(0, 0, 0); + TbActorSystem system = spy(new DefaultTbActorSystem(settings)); + system.createDispatcher(RULE_DISPATCHER_NAME, mock()); + TbActorMailbox tenantCtx = new TbActorMailbox(system, settings, null, mock(), mock(), null); + tenantActor.init(tenantCtx); + + TbMsg msg = mock(TbMsg.class); + + when(ruleChainService.findRuleChainById(tenantId, ruleChainId)).thenReturn(new RuleChain(ruleChainId)); + + RuleChainToRuleChainMsg ruleChainMsg = new RuleChainToRuleChainMsg(ruleChainId, null, msg, null); + tenantActor.doProcess(ruleChainMsg); + verify(system).createChildActor(eq(RULE_DISPATCHER_NAME), any(RuleChainActor.ActorCreator.class), any()); + reset(system); + tenantActor.doProcess(ruleChainMsg); + verify(system, never()).createChildActor(any(), any(), any()); + + //Delete rule-chain + TbActorRef ruleChainActor = system.getActor(new TbEntityActorId(ruleChainId)); + assertNotNull(ruleChainActor); + system.stop(ruleChainActor); + when(ruleChainService.findRuleChainById(tenantId, ruleChainId)).thenReturn(null); + + tenantActor.doProcess(ruleChainMsg); + verify(system).createChildActor(eq(RULE_DISPATCHER_NAME), any(RuleChainErrorActor.ActorCreator.class), any()); + reset(system); + tenantActor.doProcess(ruleChainMsg); + verify(system, never()).createChildActor(any(), any(), any()); + system.stop(); + } + } \ No newline at end of file From 72873d4ad0ee33904f68775e844d336a691cc0b1 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 17 Sep 2024 18:34:48 +0200 Subject: [PATCH 09/14] fixed concurrent modification in TbSubscriptionsInfo --- .../server/service/subscription/TbSubscriptionsInfo.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java index 48464d5297..a65fc0bedb 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java @@ -20,6 +20,7 @@ import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; import lombok.ToString; +import java.util.HashSet; import java.util.Set; /** @@ -48,7 +49,7 @@ public class TbSubscriptionsInfo { } protected TbSubscriptionsInfo copy(int seqNumber) { - return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber); + return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys != null ? new HashSet<>(tsKeys) : null, attrAllKeys, attrKeys != null ? new HashSet<>(attrKeys) : null, seqNumber); } } From 96adecd28ef13a86d55f97a0d18907cabee5c41d Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 18 Sep 2024 12:52:37 +0200 Subject: [PATCH 10/14] added corresponding tests --- ...DefaultTbLocalSubscriptionServiceTest.java | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java new file mode 100644 index 0000000000..1ec80b91f7 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java @@ -0,0 +1,126 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.cache.limits.RateLimitService; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.service.ws.WebSocketSessionRef; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DefaultTbLocalSubscriptionServiceTest { + + ListAppender testLogAppender; + TbLocalSubscriptionService subscriptionService; + + @BeforeEach + public void setUp() throws Exception { + Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); + testLogAppender = new ListAppender<>(); + testLogAppender.start(); + logger.addAppender(testLogAppender); + + RateLimitService rateLimitService = mock(); + when(rateLimitService.checkRateLimit(eq(LimitedApi.WS_SUBSCRIPTIONS), any(Object.class), nullable(String.class))).thenReturn(true); + PartitionService partitionService = mock(); + when(partitionService.resolve(any(), any(), any())).thenReturn(TopicPartitionInfo.builder().build()); + subscriptionService = new DefaultTbLocalSubscriptionService(mock(), mock(), mock(), partitionService, mock(), mock(), mock(), rateLimitService); + ReflectionTestUtils.setField(subscriptionService, "serviceId", "serviceId"); + } + + @AfterEach + public void tearDown() { + if (testLogAppender != null) { + testLogAppender.stop(); + Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); + logger.detachAppender(testLogAppender); + } + } + + @Test + public void addSubscriptionConcurrentModificationTest() throws Exception { + ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + TenantId tenantId = new TenantId(UUID.randomUUID()); + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + WebSocketSessionRef sessionRef = mock(); + ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService); + + List> futures = new ArrayList<>(); + + try { + subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType()); + for (int i = 0; i < 50; i++) { + futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef))); + } + Futures.allAsList(futures).get(); + } finally { + executorService.shutdownNow(); + } + + List logs = testLogAppender.list; + boolean exceptionLogged = logs.stream() + .filter(event -> event.getThrowableProxy() != null) + .map(event -> event.getThrowableProxy().getClassName()) + .anyMatch(log -> log.equals("java.util.ConcurrentModificationException")); + + assertFalse(exceptionLogged, "Detected ConcurrentModificationException!"); + } + + private TbSubscription createSubscription(TenantId tenantId, EntityId entityId) { + Map keys = new HashMap<>(); + for (int i = 0; i < 50; i++) { + keys.put(RandomStringUtils.randomAlphanumeric(5), 1L); + } + return TbAttributeSubscription.builder() + .tenantId(tenantId) + .entityId(entityId) + .subscriptionId(1) + .sessionId(RandomStringUtils.randomAlphanumeric(5)) + .keyStates(keys) + .build(); + } +} From f225870b52994ade7d3e3f50971cf67b4dedbd00 Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Thu, 26 Sep 2024 15:32:24 +0300 Subject: [PATCH 11/14] UI: Fixed updated value in gauge widget in animation --- .../widget/lib/analogue-gauge.models.ts | 16 ++++++++++++---- .../home/components/widget/lib/digital-gauge.ts | 3 ++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/analogue-gauge.models.ts b/ui-ngx/src/app/modules/home/components/widget/lib/analogue-gauge.models.ts index db3bc5559d..46ec39c3be 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/analogue-gauge.models.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/analogue-gauge.models.ts @@ -64,9 +64,13 @@ export interface AnalogueGaugeSettings { animationRule: AnimationRule; } +interface BaseGaugeModel extends BaseGauge { + _value?: number; +} + export abstract class TbBaseGauge { - private gauge: BaseGauge; + private gauge: BaseGaugeModel; protected constructor(protected ctx: WidgetContext, canvasId: string) { const gaugeElement = $('#' + canvasId, ctx.$container)[0]; @@ -77,16 +81,20 @@ export abstract class TbBaseGauge { protected abstract createGaugeOptions(gaugeElement: HTMLElement, settings: S): O; - protected abstract createGauge(gaugeData: O): BaseGauge; + protected abstract createGauge(gaugeData: O): BaseGaugeModel; update() { if (this.ctx.data.length > 0) { const cellData = this.ctx.data[0]; if (cellData.data.length > 0) { - const tvPair = cellData.data[cellData.data.length - - 1]; + const tvPair = cellData.data[cellData.data.length - 1]; const value = parseFloat(tvPair[1]); if (value !== this.gauge.value) { + if (!this.gauge.options.animation) { + this.gauge._value = value; + } else { + delete this.gauge._value; + } this.gauge.value = value; } } diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts b/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts index ede5962922..cc5a3e08dd 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts @@ -26,7 +26,6 @@ import { prepareFontSettings } from '@home/components/widget/lib/settings.models import { CanvasDigitalGauge, CanvasDigitalGaugeOptions } from '@home/components/widget/lib/canvas-digital-gauge'; import { DatePipe } from '@angular/common'; import { IWidgetSubscription } from '@core/api/widget-api.models'; -import { Subscription } from 'rxjs'; import { ColorProcessor, createValueSubscription, ValueSourceType } from '@shared/models/widget-settings.models'; import GenericOptions = CanvasGauges.GenericOptions; @@ -260,6 +259,8 @@ export class TbCanvasDigitalGauge { if (value !== this.gauge.value) { if (!this.gauge.options.animation) { this.gauge._value = value; + } else { + delete this.gauge._value; } this.gauge.value = value; } else if (this.localSettings.showTimestamp && this.gauge.timestamp !== timestamp) { From 31d85bb685d0fcc7ca11f3faa1ecfa55960c1ab3 Mon Sep 17 00:00:00 2001 From: Artem Dzhereleiko Date: Fri, 27 Sep 2024 11:25:30 +0300 Subject: [PATCH 12/14] UI: Fixed position for icon SCADA analog meters --- .../json/system/scada_symbols/left-analog-water-level-meter.svg | 2 +- .../system/scada_symbols/right-analog-water-level-meter.svg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/data/json/system/scada_symbols/left-analog-water-level-meter.svg b/application/src/main/data/json/system/scada_symbols/left-analog-water-level-meter.svg index 9b7a15bf7c..6eef9bf474 100644 --- a/application/src/main/data/json/system/scada_symbols/left-analog-water-level-meter.svg +++ b/application/src/main/data/json/system/scada_symbols/left-analog-water-level-meter.svg @@ -37,7 +37,7 @@ }, { "tag": "icon", - "stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n} else {\n element.hide()\n}", + "stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nvar showLabel = ctx.properties.label;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n if (!showLabel) {\n element.transform({translateX: 83,translateY: 137});\n }\n} else {\n element.hide()\n}\n", "actions": null }, { diff --git a/application/src/main/data/json/system/scada_symbols/right-analog-water-level-meter.svg b/application/src/main/data/json/system/scada_symbols/right-analog-water-level-meter.svg index 592339003f..bc271bbdcf 100644 --- a/application/src/main/data/json/system/scada_symbols/right-analog-water-level-meter.svg +++ b/application/src/main/data/json/system/scada_symbols/right-analog-water-level-meter.svg @@ -37,7 +37,7 @@ }, { "tag": "icon", - "stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n} else {\n element.hide()\n}", + "stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nvar showLabel = ctx.properties.label;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n if (!showLabel) {\n element.transform({translateX: 119, translateY: 137});\n }\n} else {\n element.hide()\n}", "actions": null }, { From d3e94c179223b1d373009e20fb0fff926bd7a84d Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 27 Sep 2024 13:49:09 +0300 Subject: [PATCH 13/14] Add option to fetch repo if already cloned --- .../sync/vc/DefaultClusterVersionControlService.java | 5 +++-- .../service/sync/vc/DefaultGitRepositoryService.java | 11 +++++++---- .../server/service/sync/vc/GitRepositoryService.java | 3 ++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index 138505bd8d..ca0636761d 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -223,7 +223,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe var currentSettings = vcService.getRepositorySettings(ctx.getTenantId()); var newSettings = ctx.getSettings(); if (!newSettings.equals(currentSettings)) { - vcService.initRepository(ctx.getTenantId(), ctx.getSettings()); + vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false); } if (msg.hasCommitRequest()) { handleCommitRequest(ctx, msg.getCommitRequest()); @@ -464,7 +464,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private void handleInitRepositoryCommand(VersionControlRequestCtx ctx) { try { - vcService.initRepository(ctx.getTenantId(), ctx.getSettings()); + vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false); reply(ctx, Optional.empty()); } catch (Exception e) { log.debug("[{}] Failed to connect to the repository: ", ctx, e); @@ -564,4 +564,5 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe }, MoreExecutors.directExecutor()); } } + } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java index bf3e202c2d..0788143030 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java @@ -205,7 +205,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService { if (!Files.exists(Path.of(gitRepository.getDirectory()))) { try { - return cloneRepository(tenantId, gitRepository.getSettings()); + return openOrCloneRepository(tenantId, gitRepository.getSettings(), false); } catch (Exception e) { throw new IllegalStateException("Could not initialize the repository: " + e.getMessage(), e); } @@ -239,11 +239,11 @@ public class DefaultGitRepositoryService implements GitRepositoryService { } @Override - public void initRepository(TenantId tenantId, RepositorySettings settings) throws Exception { + public void initRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception { if (!settings.isLocalOnly()) { clearRepository(tenantId); } - cloneRepository(tenantId, settings); + openOrCloneRepository(tenantId, settings, fetch); } @Override @@ -280,13 +280,16 @@ public class DefaultGitRepositoryService implements GitRepositoryService { return EntityIdFactory.getByTypeAndUuid(entityType, entityId); } - private GitRepository cloneRepository(TenantId tenantId, RepositorySettings settings) throws Exception { + private GitRepository openOrCloneRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception { log.debug("[{}] Init tenant repository started.", tenantId); Path repositoryDirectory = Path.of(repositoriesFolder, settings.isLocalOnly() ? "local_" + settings.getRepositoryUri() : tenantId.getId().toString()); GitRepository repository; if (Files.exists(repositoryDirectory)) { repository = GitRepository.open(repositoryDirectory.toFile(), settings); + if (fetch) { + repository.fetch(); + } } else { Files.createDirectories(repositoryDirectory); if (settings.isLocalOnly()) { diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java index 1382a69599..d5d51397f9 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java @@ -42,7 +42,7 @@ public interface GitRepositoryService { void testRepository(TenantId tenantId, RepositorySettings settings) throws Exception; - void initRepository(TenantId tenantId, RepositorySettings settings) throws Exception; + void initRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception; RepositorySettings getRepositorySettings(TenantId tenantId) throws Exception; @@ -67,4 +67,5 @@ public interface GitRepositoryService { String getContentsDiff(TenantId tenantId, String content1, String content2) throws IOException; void fetch(TenantId tenantId) throws GitAPIException; + } From d828d699e733ba5479880816240b25afae9ef76e Mon Sep 17 00:00:00 2001 From: Kulikov <44275303+nickAS21@users.noreply.github.com> Date: Fri, 27 Sep 2024 16:23:11 +0300 Subject: [PATCH 14/14] Lwm2m fix bug (#11706) * lwm2m: fix bug Collected Value with different TS * lwm2m: fix bug Collected Value with different TS, add constants * lwm2m: fix bug SerDez - restore * lwm2m: fix bug add ts value to toTsKvList * lwm2m: add test sendCollected for actual ts * lwm2m: refactoring test sendCollected for actual ts --- .../transport/lwm2m/Lwm2mTestHelper.java | 11 +- .../lwm2m/client/LwM2MTestClient.java | 19 ++- .../lwm2m/client/LwM2mTemperatureSensor.java | 62 +++++---- .../rpc/AbstractRpcLwM2MIntegrationTest.java | 78 ++++++++++- ...cLwm2MIntegrationObserveCompositeTest.java | 47 ------- .../sql/RpcLwm2mIntegrationObserveTest.java | 20 ++- .../rpc/sql/RpcLwm2mIntegrationReadTest.java | 122 +++++++++--------- .../store/LwM2MBootstrapSecurityStore.java | 2 +- .../server/LwM2mTransportServerHelper.java | 13 +- .../uplink/DefaultLwM2mUplinkMsgHandler.java | 21 ++- 10 files changed, 213 insertions(+), 182 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/Lwm2mTestHelper.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/Lwm2mTestHelper.java index c627c77cd4..4aa47eddf5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/Lwm2mTestHelper.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/Lwm2mTestHelper.java @@ -42,6 +42,7 @@ public class Lwm2mTestHelper { public static final int RESOURCE_ID_11 = 11; public static final int RESOURCE_ID_14 = 14; public static final int RESOURCE_ID_15 = 15; + public static final int RESOURCE_ID_5700 = 5700; public static final int RESOURCE_INSTANCE_ID_0 = 0; public static final int RESOURCE_INSTANCE_ID_2 = 2; @@ -51,6 +52,12 @@ public class Lwm2mTestHelper { public static final String RESOURCE_ID_NAME_19_0_2 = "dataCreationTime"; public static final String RESOURCE_ID_NAME_19_1_0 = "dataWrite"; public static final String RESOURCE_ID_NAME_19_0_3 = "dataDescription"; + public static final String RESOURCE_ID_NAME_3303_12_5700 = "sensorValue"; + public static final double RESOURCE_ID_3303_12_5700_VALUE_0 = 25.05d; + public static final double RESOURCE_ID_3303_12_5700_VALUE_1 = 35.12d; + public static long RESOURCE_ID_3303_12_5700_TS_0 = 0; + public static long RESOURCE_ID_3303_12_5700_TS_1 = 0; + public static final int RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS = 3000; public enum LwM2MClientState { @@ -72,8 +79,8 @@ public class Lwm2mTestHelper { ON_DEREGISTRATION_FAILURE(14, "onDeregistrationFailure"), ON_DEREGISTRATION_TIMEOUT(15, "onDeregistrationTimeout"), ON_EXPECTED_ERROR(16, "onUnexpectedError"), - ON_READ_CONNECTION_ID (17, "onReadConnection"), - ON_WRITE_CONNECTION_ID (18, "onWriteConnection"); + ON_READ_CONNECTION_ID(17, "onReadConnection"), + ON_WRITE_CONNECTION_ID(18, "onWriteConnection"); public int code; public String type; diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java index b79399170e..655edc6db6 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java @@ -137,7 +137,6 @@ public class LwM2MTestClient { private Map clientDtlsCid; private LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandlerTest; private LwM2mClientContext clientContext; - public void init(Security security, Security securityBs, int port, boolean isRpc, LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandler, LwM2mClientContext clientContext, boolean isWriteAttribute, Integer cIdLength, boolean queueMode, @@ -159,11 +158,11 @@ public class LwM2MTestClient { initializer.setClassForObject(SECURITY, Security.class); initializer.setInstancesForObject(SECURITY, instances); // SERVER - Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60)); + Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60)); lwm2mServer.setId(serverId); - Server serverBs = new Server(shortServerIdBs0, TimeUnit.MINUTES.toSeconds(60)); + Server serverBs = new Server(shortServerIdBs0, TimeUnit.MINUTES.toSeconds(60)); serverBs.setId(serverIdBs); - instances = new LwM2mInstanceEnabler[]{serverBs, lwm2mServer}; + instances = new LwM2mInstanceEnabler[]{serverBs, lwm2mServer}; initializer.setClassForObject(SERVER, Server.class); initializer.setInstancesForObject(SERVER, instances); } else if (securityBs != null) { @@ -177,7 +176,7 @@ public class LwM2MTestClient { // SERVER Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60)); lwm2mServer.setId(serverId); - initializer.setInstancesForObject(SERVER, lwm2mServer ); + initializer.setInstancesForObject(SERVER, lwm2mServer); } initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor)); @@ -239,11 +238,11 @@ public class LwM2MTestClient { boolean supportDeprecatedCiphers = false; clientCoapConfig.set(DTLS_RECOMMENDED_CIPHER_SUITES_ONLY, !supportDeprecatedCiphers); - if (cIdLength!= null) { + if (cIdLength != null) { setDtlsConnectorConfigCidLength(clientCoapConfig, cIdLength); } - if (cIdLength!= null) { + if (cIdLength != null) { setDtlsConnectorConfigCidLength(clientCoapConfig, cIdLength); } @@ -262,12 +261,12 @@ public class LwM2MTestClient { // Configure Registration Engine DefaultRegistrationEngineFactory engineFactory = new DefaultRegistrationEngineFactory(); - // old + // old /** * Force reconnection/rehandshake on registration update. */ int comPeriodInSec = 5; - if (comPeriodInSec > 0) engineFactory.setCommunicationPeriod(comPeriodInSec * 1000); + if (comPeriodInSec > 0) engineFactory.setCommunicationPeriod(comPeriodInSec * 1000); // engineFactory.setCommunicationPeriod(5000); // old /** * By default client will try to resume DTLS session by using abbreviated Handshake. This option force to always do a full handshake." @@ -288,7 +287,7 @@ public class LwM2MTestClient { builder.setDataSenders(new ManualDataSender()); builder.setRegistrationEngineFactory(engineFactory); Map decoders = new HashMap<>(); - Map encoders = new HashMap<>(); + Map encoders = new HashMap<>(); if (supportFormatOnly_SenMLJSON_SenMLCBOR) { // decoders.put(ContentFormat.OPAQUE, new LwM2mNodeOpaqueDecoder()); decoders.put(ContentFormat.CBOR, new LwM2mNodeCborDecoder()); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2mTemperatureSensor.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2mTemperatureSensor.java index cfdef139c2..4f594ed4c0 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2mTemperatureSensor.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2mTemperatureSensor.java @@ -26,16 +26,20 @@ import org.eclipse.leshan.core.request.ContentFormat; import org.eclipse.leshan.core.request.argument.Arguments; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; +import org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper; import javax.security.auth.Destroyable; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_0; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_1; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS; @Slf4j public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destroyable { @@ -46,7 +50,9 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr private double maxMeasuredValue = currentTemp; private LeshanClient leshanClient; - private List containingValues; + private int cntRead_5700; + private int cntIdentitySystem; + protected static final Random RANDOM = new Random(); private static final List supportedResources = Arrays.asList(5601, 5602, 5700, 5701); @@ -57,7 +63,7 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr public LwM2mTemperatureSensor(ScheduledExecutorService executorService, Integer id) { try { if (id != null) this.setId(id); - executorService.scheduleWithFixedDelay(this::adjustTemperature, 2000, 2000, TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay(this::adjustTemperature, 2000, 2000, TimeUnit.MILLISECONDS); } catch (Throwable e) { log.error("[{}]Throwable", e.toString()); e.printStackTrace(); @@ -73,15 +79,18 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr case 5602: return ReadResponse.success(resourceId, getTwoDigitValue(maxMeasuredValue)); case 5700: - if (identity == LwM2mServer.SYSTEM) { - setTemperature(); - setData(); + if (identity == LwM2mServer.SYSTEM) { // return value for ForCollectedValue + cntIdentitySystem++; + return ReadResponse.success(resourceId, cntIdentitySystem == 1 ? + RESOURCE_ID_3303_12_5700_VALUE_0 : RESOURCE_ID_3303_12_5700_VALUE_1); + } + cntRead_5700++; + if (cntRead_5700 == 1) { // read value after start return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp)); - } else if (this.getId() == 12 && this.leshanClient != null) { - containingValues = new ArrayList<>(); - sendCollected(5700); - return ReadResponse.success(resourceId, getData()); } else { + if (this.getId() == 12 && this.leshanClient != null) { + sendCollected(); + } return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp)); } case 5701: @@ -117,10 +126,11 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr } } - private void setTemperature(){ + private void setTemperature() { float delta = (RANDOM.nextInt(20) - 10) / 10f; currentTemp += delta; } + private synchronized Integer adjustMinMaxMeasuredValue(double newTemperature) { if (newTemperature > maxMeasuredValue) { maxMeasuredValue = newTemperature; @@ -143,7 +153,7 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr return supportedResources; } - protected void setLeshanClient(LeshanClient leshanClient){ + protected void setLeshanClient(LeshanClient leshanClient) { this.leshanClient = leshanClient; } @@ -151,40 +161,26 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr public void destroy() { } - private void sendCollected(int resourceId) { + private void sendCollected() { try { + int resourceId = 5700; LwM2mServer registeredServer = this.leshanClient.getRegisteredServers().values().iterator().next(); ManualDataSender sender = this.leshanClient.getSendService().getDataSender(ManualDataSender.DEFAULT_NAME, ManualDataSender.class); sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId))); - Thread.sleep(1000); + Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_0 = Instant.now().toEpochMilli(); + Thread.sleep(RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS); sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId))); + Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_1 = Instant.now().toEpochMilli(); sender.sendCollectedData(registeredServer, ContentFormat.SENML_JSON, 1000, false); } catch (InterruptedException e) { throw new RuntimeException(e); } } + private LwM2mPath getPathForCollectedValue(int resourceId) { return new LwM2mPath(3303, this.getId(), resourceId); } - - private double getData() { - if (containingValues.size() > 1) { - Integer t0 = Math.toIntExact(Math.round(containingValues.get(0) * 100)); - Integer t1 = Math.toIntExact(Math.round(containingValues.get(1) * 100)); - long to_t1 = (((long) t0) << 32) | (t1 & 0xffffffffL); - return Double.longBitsToDouble(to_t1); - } else { - return currentTemp; - } - - } - - private void setData() { - if (containingValues == null){ - containingValues = new ArrayList<>(); - } - containingValues.add(getTwoDigitValue(currentTemp)); - } } + diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java index 1f61a08e15..d0b86fdab0 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java @@ -15,20 +15,30 @@ */ package org.thingsboard.server.transport.lwm2m.rpc; +import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.link.LinkParser; import org.eclipse.leshan.core.link.lwm2m.DefaultLwM2mLinkParser; import org.junit.Before; +import org.mockito.Mockito; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest; +import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper; +import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import static org.awaitility.Awaitility.await; import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL; import static org.eclipse.leshan.core.LwM2mId.DEVICE; import static org.eclipse.leshan.core.LwM2mId.FIRMWARE; @@ -40,19 +50,23 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_ID_0 import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_ID_1; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_0; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_1; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_12; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_0; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_14; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_5700; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_9; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_0; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_2; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_1_0; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3303_12_5700; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_14; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.TEMPERATURE_SENSOR; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.resources; import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fromVersionedIdToObjectId; +@Slf4j @DaoSqlTest public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MIntegrationTest { @@ -84,6 +98,12 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg protected String idVer_19_0_0; + @SpyBean + protected DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest; + + @SpyBean + protected LwM2mTransportServerHelper lwM2mTransportServerHelperTest; + public AbstractRpcLwM2MIntegrationTest() { setResources(resources); } @@ -144,7 +164,8 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg " \"" + objectIdVer_3 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_14 + "\": \"" + RESOURCE_ID_NAME_3_14 + "\",\n" + " \"" + idVer_19_0_0 + "\": \"" + RESOURCE_ID_NAME_19_0_0 + "\",\n" + " \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\": \"" + RESOURCE_ID_NAME_19_1_0 + "\",\n" + - " \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_2 + "\": \"" + RESOURCE_ID_NAME_19_0_2 + "\"\n" + + " \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_2 + "\": \"" + RESOURCE_ID_NAME_19_0_2 + "\",\n" + + " \"" + objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + RESOURCE_ID_5700 + "\": \"" + RESOURCE_ID_NAME_3303_12_5700 + "\"\n" + " },\n" + " \"observe\": [\n" + " \"" + idVer_3_0_9 + "\",\n" + @@ -159,7 +180,8 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg " \"telemetry\": [\n" + " \"" + idVer_3_0_9 + "\",\n" + " \"" + idVer_19_0_0 + "\",\n" + - " \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\"\n" + + " \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\",\n" + + " \"" + objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + RESOURCE_ID_5700 + "\"\n" + " ],\n" + " \"attributeLwm2m\": {}\n" + " }"; @@ -183,4 +205,56 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg return pathIdVer; } + protected long countUpdateAttrTelemetryAll() { + return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) + .getInvocations().stream() + .filter(invocation -> invocation.getMethod().getName().equals("updateAttrTelemetry")) + .count(); + } + + protected long countUpdateAttrTelemetryResource(String idVerRez) { + return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) + .getInvocations().stream() + .filter(invocation -> + invocation.getMethod().getName().equals("updateAttrTelemetry") && + invocation.getArguments().length > 1 && + idVerRez.equals(invocation.getArguments()[1]) + ) + .count(); + } + + protected void updateRegAtLeastOnceAfterAction() { + long initialInvocationCount = countUpdateReg(); + AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount); + log.trace("updateRegAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount); + await("Update Registration at-least-once after action") + .atMost(50, TimeUnit.SECONDS) + .until(() -> { + newInvocationCount.set(countUpdateReg()); + return newInvocationCount.get() > initialInvocationCount; + }); + log.trace("updateRegAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get()); + } + + protected long countUpdateReg() { + return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) + .getInvocations().stream() + .filter(invocation -> invocation.getMethod().getName().equals("updatedReg")) + .count(); + } + + protected long countSendParametersOnThingsboardTelemetryResource(String rezName) { + return Mockito.mockingDetails(lwM2mTransportServerHelperTest) + .getInvocations().stream() + .filter(invocation -> + invocation.getMethod().getName().equals("sendParametersOnThingsboardTelemetry") && + invocation.getArguments().length > 0 && + invocation.getArguments()[0] instanceof List && + ((List) invocation.getArguments()[0]).stream() + .filter(arg -> arg instanceof TransportProtos.KeyValueProto) + .anyMatch(arg -> rezName.equals(((TransportProtos.KeyValueProto) arg).getKey())) + ) + .count(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java index cf78c58288..a1ad762aaa 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java @@ -20,11 +20,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.junit.Test; -import org.mockito.Mockito; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationObserveTest; -import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -55,10 +52,6 @@ import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fr @Slf4j public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MIntegrationObserveTest { - @SpyBean - DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest; - - /** * ObserveComposite {"ids":["5/0/7", "5/0/5", "5/0/3", "3/0/9", "19/1/0/0"]} - Ok * @throws Exception @@ -517,13 +510,6 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt return doPostAsync("/api/plugins/rpc/twoway/" + deviceId, sendRpcRequest, String.class, status().isOk()); } - private long countUpdateAttrTelemetryAll() { - return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) - .getInvocations().stream() - .filter(invocation -> invocation.getMethod().getName().equals("updateAttrTelemetry")) - .count(); - } - private void updateAttrTelemetryAllAtLeastOnceAfterAction(long initialInvocationCount) { AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount); log.warn("countUpdateAttrTelemetryAllAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount); @@ -536,19 +522,6 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt log.warn("countUpdateAttrTelemetryAllAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get()); } - - private long countUpdateAttrTelemetryResource(String idVerRez) { - return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) - .getInvocations().stream() - .filter(invocation -> - invocation.getMethod().getName().equals("updateAttrTelemetry") && - invocation.getArguments().length > 1 && - idVerRez.equals(invocation.getArguments()[1]) - ) - .count(); - } - - private void updateAttrTelemetryResourceAtLeastOnceAfterAction(long initialInvocationCount, String idVerRez) { AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount); log.warn("countUpdateAttrTelemetryResourceAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount); @@ -560,24 +533,4 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt }); log.warn("countUpdateAttrTelemetryResourceAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get()); } - - private long countUpdateReg() { - return Mockito.mockingDetails(defaultUplinkMsgHandlerTest) - .getInvocations().stream() - .filter(invocation -> invocation.getMethod().getName().equals("updatedReg")) - .count(); - } - - private void updateRegAtLeastOnceAfterAction() { - long initialInvocationCount = countUpdateReg(); - AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount); - log.warn("updateRegAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount); - await("Update Registration at-least-once after action") - .atMost(50, TimeUnit.SECONDS) - .until(() -> { - newInvocationCount.set(countUpdateReg()); - return newInvocationCount.get() > initialInvocationCount; - }); - log.warn("updateRegAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get()); - } } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java index ce2622c4db..a665f7f53c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java @@ -24,9 +24,7 @@ import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.server.registration.Registration; import org.junit.Test; import org.mockito.Mockito; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationObserveTest; -import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler; import java.util.Optional; @@ -41,14 +39,12 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INST import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_0; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9; import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fromVersionedIdToObjectId; @Slf4j public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationObserveTest { - @SpyBean - DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest; - @Test public void testObserveReadAll_Count_4_CancelAll_Count_0_Ok() throws Exception { String actualValuesReadAll = sendRpcObserveOkWithResultValue("ObserveReadAll", null); @@ -64,12 +60,12 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO */ @Test public void testObserveOneResource_Result_CONTENT_Value_Count_3_After_Cancel_Count_2() throws Exception { + long initSendTelemetryAtCount = countSendParametersOnThingsboardTelemetryResource(RESOURCE_ID_NAME_3_9); sendObserveCancelAllWithAwait(deviceId); sendRpcObserveWithContainsLwM2mSingleResource(idVer_3_0_9); - - int cntUpdate = 3; - verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate)) - .onUpdateValueAfterReadResponse(Mockito.any(Registration.class), eq(idVer_3_0_9), Mockito.any(ReadResponse.class)); + updateRegAtLeastOnceAfterAction(); + long lastSendTelemetryAtCount = countSendParametersOnThingsboardTelemetryResource(RESOURCE_ID_NAME_3_9); + assertTrue(lastSendTelemetryAtCount > initSendTelemetryAtCount); } /** @@ -84,7 +80,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO int cntUpdate = 3; verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate)) - .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9)); + .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null)); } /** @@ -99,7 +95,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO int cntUpdate = 3; verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate)) - .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9)); + .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null)); } /** @@ -334,7 +330,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO cntUpdate = 10; verify(defaultUplinkMsgHandlerTest, timeout(50000).atLeast(cntUpdate)) - .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9)); + .updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null)); } private void sendRpcObserveWithWithTwoResource(String expectedId_1, String expectedId_2) throws Exception { diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationReadTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationReadTest.java index 6d4e11ffc1..1ab4893ec4 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationReadTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationReadTest.java @@ -15,30 +15,26 @@ */ package org.thingsboard.server.transport.lwm2m.rpc.sql; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.map.HashedMap; import org.eclipse.leshan.core.ResponseCode; -import org.eclipse.leshan.core.node.LwM2mNode; import org.eclipse.leshan.core.node.LwM2mPath; -import org.eclipse.leshan.core.node.LwM2mResource; -import org.eclipse.leshan.core.node.TimestampedLwM2mNodes; -import org.eclipse.leshan.server.registration.Registration; import org.junit.Test; -import org.mockito.Mockito; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationTest; -import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler; import java.time.Instant; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; import static org.eclipse.leshan.core.LwM2mId.SERVER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.BINARY_APP_DATA_CONTAINER; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_0; @@ -49,20 +45,22 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_11; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_14; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_0; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_1; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_9; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_0; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_3; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_1_0; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3303_12_5700; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_14; import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_0; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_1; +import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS; @Slf4j public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest { - @SpyBean - DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest; - - /** * Read {"id":"/3"} * Read {"id":"/6"}... @@ -88,7 +86,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest e.printStackTrace(); } }); - } catch (Exception e2){ + } catch (Exception e2) { e2.printStackTrace(); } } @@ -99,10 +97,10 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest * @throws Exception */ @Test - public void testReadAllInstancesInClientById_Result_CONTENT_Value_IsInstances_IsResources() throws Exception{ + public void testReadAllInstancesInClientById_Result_CONTENT_Value_IsInstances_IsResources() throws Exception { expectedObjectIdVerInstances.forEach(expected -> { try { - String actualResult = sendRPCById((String) expected); + String actualResult = sendRPCById((String) expected); String expectedObjectId = pathIdVerToObjectId((String) expected); LwM2mPath expectedPath = new LwM2mPath(expectedObjectId); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); @@ -122,7 +120,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest */ @Test public void testReadMultipleResourceById_Result_CONTENT_Value_IsLwM2mMultipleResource() throws Exception { - String expectedIdVer = objectInstanceIdVer_3 +"/" + RESOURCE_ID_11; + String expectedIdVer = objectInstanceIdVer_3 + "/" + RESOURCE_ID_11; String actualResult = sendRPCById(expectedIdVer); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); @@ -135,7 +133,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest */ @Test public void testReadSingleResourceById_Result_CONTENT_Value_IsLwM2mSingleResource() throws Exception { - String expectedIdVer = objectInstanceIdVer_3 +"/" + RESOURCE_ID_14; + String expectedIdVer = objectInstanceIdVer_3 + "/" + RESOURCE_ID_14; String actualResult = sendRPCById(expectedIdVer); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); @@ -161,7 +159,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest */ @Test public void testReadCompositeSingleResourceByIds_Result_CONTENT_Value_IsObjectIsLwM2mSingleResourceIsLwM2mMultipleResource() throws Exception { - String expectedIdVer_1 = (String) expectedObjectIdVers.stream().filter(path -> (!((String)path).contains("/" + BINARY_APP_DATA_CONTAINER) && ((String)path).contains("/" + SERVER))).findFirst().get(); + String expectedIdVer_1 = (String) expectedObjectIdVers.stream().filter(path -> (!((String) path).contains("/" + BINARY_APP_DATA_CONTAINER) && ((String) path).contains("/" + SERVER))).findFirst().get(); String objectId_1 = pathIdVerToObjectId(expectedIdVer_1); String expectedIdVer3_0_1 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_1; String expectedIdVer3_0_11 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_11; @@ -221,8 +219,8 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest String objectId_19 = pathIdVerToObjectId(objectIdVer_19); String expected3_0_9 = objectInstanceId_3 + "/" + RESOURCE_ID_9 + "=LwM2mSingleResource [id=" + RESOURCE_ID_9 + ", value="; String expected3_0_14 = objectInstanceId_3 + "/" + RESOURCE_ID_14 + "=LwM2mSingleResource [id=" + RESOURCE_ID_14 + ", value="; - String expected19_0_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_0 + expectedKey19_X_0; - String expected19_1_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + expectedKey19_X_0; + String expected19_0_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_0 + expectedKey19_X_0; + String expected19_1_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + expectedKey19_X_0; String actualValues = rpcActualResult.get("value").asText(); assertTrue(actualValues.contains(expected3_0_9)); assertTrue(actualValues.contains(expected3_0_14)); @@ -232,56 +230,55 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest /** - * /3303/0/5700 - * Read {"id":"/3303/0/5700"} + * Read {"id":"/3303/12/5700"} * Trigger a Send operation from the client with multiple values for the same resource as a payload * acked "[{"bn":"/3303/12/5700","bt":1724".. 116 bytes] - * 2 values for the resource /3303/12/5700 should be stored with timestamps1 = Instance.now(), timestamps2 = Instance.now() - * + * 2 values for the resource /3303/12/5700 should be stored with: + * - timestamps1 = Instance.now() + RESOURCE_ID_VALUE_3303_12_5700_1 + * - timestamps2 = (timestamps1 + 3 sec) + RESOURCE_ID_VALUE_3303_12_5700_2 * @throws Exception */ @Test public void testReadSingleResource_sendFromClient_CollectedValue() throws Exception { - TimestampedLwM2mNodes[] tsNodesHolder = new TimestampedLwM2mNodes[1]; - doAnswer(inv -> { - tsNodesHolder[0] = inv.getArgument(1); - return null; - }).when(defaultUplinkMsgHandlerTest).onUpdateValueWithSendRequest( - Mockito.any(Registration.class), - Mockito.any(TimestampedLwM2mNodes.class) - ); + // init test + long startTs = Instant.now().toEpochMilli(); + int cntValues = 4; int resourceId = 5700; String expectedIdVer = objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + resourceId; - String actualResult = sendRPCById(expectedIdVer); - verify(defaultUplinkMsgHandlerTest, timeout(10000).times(1)) - .onUpdateValueWithSendRequest(Mockito.any(Registration.class), Mockito.any(TimestampedLwM2mNodes.class)); - - ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); - assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); - String expected = "LwM2mSingleResource [id=" + resourceId + ", value="; - String actual = rpcActualResult.get("value").asText(); - assertTrue(actual.contains(expected)); - int indStart = actual.indexOf(expected) + expected.length(); - int indEnd = actual.indexOf(",", indStart); - String valStr = actual.substring(indStart, indEnd); - double dd = Double.parseDouble(valStr); - long combined = Double.doubleToRawLongBits(dd); - int t0 = (int) (combined >> 32); - int t1 = (int) combined; - double[] expectedValues ={(double)t0/100, (double)t1/100}; - int ind = 0; - LwM2mPath expectedPath = new LwM2mPath("/3303/12/5700"); - for (Instant ts : tsNodesHolder[0].getTimestamps()) { - Map nodesAt = tsNodesHolder[0].getNodesAt(ts); - for (var instant : nodesAt.entrySet()) { - LwM2mPath actualPath = instant.getKey(); - LwM2mNode node = instant.getValue(); - LwM2mResource lwM2mResource = (LwM2mResource) node; - assertEquals(expectedPath, actualPath); - assertEquals(expectedValues[ind], lwM2mResource.getValue()); - ind++; + sendRPCById(expectedIdVer); + // verify result read: verify count value: 1-2: send CollectedValue; 3 - response for read; + long endTs = Instant.now().toEpochMilli() + RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS * 4; + String expectedVal_1 = String.valueOf(RESOURCE_ID_3303_12_5700_VALUE_0); + String expectedVal_2 = String.valueOf(RESOURCE_ID_3303_12_5700_VALUE_1); + AtomicReference actualValues = new AtomicReference<>(); + await().atMost(40, SECONDS).until(() -> { + actualValues.set(doGetAsync( + "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?keys=" + + RESOURCE_ID_NAME_3303_12_5700 + + "&startTs=" + startTs + + "&endTs=" + endTs + + "&interval=0&limit=100&useStrictDataTypes=false", + ObjectNode.class)); + // verify cntValues + return actualValues.get() != null && actualValues.get().get(RESOURCE_ID_NAME_3303_12_5700).size() == cntValues; + }); + // verify ts + ArrayNode actual = (ArrayNode) actualValues.get().get(RESOURCE_ID_NAME_3303_12_5700); + Map keyTsMaps = new HashedMap(); + for (JsonNode tsNode: actual) { + if (tsNode.get("value").asText().equals(expectedVal_1) || tsNode.get("value").asText().equals(expectedVal_2)) { + keyTsMaps.put(tsNode.get("value").asText(), tsNode.get("ts").asLong()); } } + assertTrue(keyTsMaps.size() == 2); + long actualTS0 = keyTsMaps.get(expectedVal_1).longValue(); + long actualTS1 = keyTsMaps.get(expectedVal_2).longValue(); + assertTrue(actualTS0 > 0); + assertTrue(actualTS1 > 0); + assertTrue(actualTS1 > actualTS0); + assertTrue((actualTS1 - actualTS0) >= RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS); + assertTrue(actualTS0 <= RESOURCE_ID_3303_12_5700_TS_0); + assertTrue(actualTS1 <= RESOURCE_ID_3303_12_5700_TS_1); } /** @@ -301,7 +298,6 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest assertEquals(actualValue, expectedValue); } - private String sendRPCById(String path) throws Exception { String setRpcRequest = "{\"method\": \"Read\", \"params\": {\"id\": \"" + path + "\"}}"; return doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setRpcRequest, String.class, status().isOk()); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/store/LwM2MBootstrapSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/store/LwM2MBootstrapSecurityStore.java index 4dcbb908d1..025f665afa 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/store/LwM2MBootstrapSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/store/LwM2MBootstrapSecurityStore.java @@ -133,7 +133,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { log.error(" [{}] Different values SecurityMode between of client and profile.", store.getEndpoint()); log.error("{} getParametersBootstrap: [{}] Different values SecurityMode between of client and profile.", LOG_LWM2M_ERROR, store.getEndpoint()); String logMsg = String.format("%s: Different values SecurityMode between of client and profile.", LOG_LWM2M_ERROR); - helper.sendParametersOnThingsboardTelemetry(helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), sessionInfo); + helper.sendParametersOnThingsboardTelemetry(helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), sessionInfo, null); return null; } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java index 86df96f40e..625a9fb61a 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -58,12 +59,12 @@ public class LwM2mTransportServerHelper { context.getTransportService().process(sessionInfo, postAttributeMsg, TransportServiceCallback.EMPTY); } - public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo) { - sendParametersOnThingsboardTelemetry(kvList, sessionInfo, null); + public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo, @Nullable Map keyTsLatestMaps){ + sendParametersOnThingsboardTelemetry(kvList, sessionInfo, keyTsLatestMaps, null); } - public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo, @Nullable Map keyTsLatestMap) { - TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap); + public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo, @Nullable Map keyTsLatestMap, @Nullable Instant ts) { + TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap, ts); PostTelemetryMsg postTelemetryMsg = PostTelemetryMsg.newBuilder() .addTsKvList(tsKvList) @@ -72,9 +73,9 @@ public class LwM2mTransportServerHelper { context.getTransportService().process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY); } - TransportProtos.TsKvListProto toTsKvList(List kvList, Map keyTsLatestMap) { + TransportProtos.TsKvListProto toTsKvList(List kvList, Map keyTsLatestMap, @Nullable Instant ts) { return TransportProtos.TsKvListProto.newBuilder() - .setTs(getTs(kvList, keyTsLatestMap)) + .setTs(ts == null ? getTs(kvList, keyTsLatestMap) : ts.toEpochMilli()) .addAllKv(kvList) .build(); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java index df1a86d2eb..a79ed43784 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java @@ -42,7 +42,6 @@ import org.eclipse.leshan.core.observation.Observation; import org.eclipse.leshan.core.request.CreateRequest; import org.eclipse.leshan.core.request.ObserveRequest; import org.eclipse.leshan.core.request.ReadRequest; -import org.eclipse.leshan.core.request.SendRequest; import org.eclipse.leshan.core.request.WriteCompositeRequest; import org.eclipse.leshan.core.request.WriteRequest; import org.eclipse.leshan.core.request.WriteRequest.Mode; @@ -117,6 +116,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH; @@ -382,7 +382,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl this.updateObjectInstanceResourceValue(lwM2MClient, lwM2mObjectInstance, path.toString(), 0); } else if (node instanceof LwM2mResource) { LwM2mResource lwM2mResource = (LwM2mResource) node; - this.updateResourcesValue(lwM2MClient, lwM2mResource, path.toString(), Mode.UPDATE, 0); + this.updateResourcesValueWithTs(lwM2MClient, lwM2mResource, path.toString(), Mode.UPDATE, ts); } } tryAwake(lwM2MClient); @@ -612,12 +612,21 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl otaService.onCurrentSoftwareResultUpdate(lwM2MClient, (Long) lwM2mResource.getValue()); } if (ResponseCode.BAD_REQUEST.getCode() > code) { - this.updateAttrTelemetry(registration, path); + this.updateAttrTelemetry(registration, path, null); } } else { log.error("Fail update path [{}] Resource [{}]", path, lwM2mResource); } } + private void updateResourcesValueWithTs(LwM2mClient lwM2MClient, LwM2mResource lwM2mResource, String stringPath, Mode mode, Instant ts) { + Registration registration = lwM2MClient.getRegistration(); + String path = convertObjectIdToVersionedId(stringPath, lwM2MClient); + if (lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider, mode)) { + this.updateAttrTelemetry(registration, path, ts); + } else { + log.error("Fail update path [{}] Resource [{}] with ts.", path, lwM2mResource); + } + } /** @@ -629,7 +638,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl * * @param registration - Registration LwM2M Client */ - public void updateAttrTelemetry(Registration registration, String path) { + public void updateAttrTelemetry(Registration registration, String path, Instant ts) { log.trace("UpdateAttrTelemetry paths [{}]", path); try { ResultsAddKeyValueProto results = this.getParametersFromProfile(registration, path); @@ -640,8 +649,8 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl this.helper.sendParametersOnThingsboardAttribute(results.getResultAttributes(), sessionInfo); } if (results.getResultTelemetries().size() > 0) { - log.trace("UpdateTelemetry paths [{}] value [{}]", path, results.getResultTelemetries().get(0).toString()); - this.helper.sendParametersOnThingsboardTelemetry(results.getResultTelemetries(), sessionInfo); + log.trace("UpdateTelemetry paths [{}] value [{}] ts [{}]", path, results.getResultTelemetries().get(0).toString(), ts == null ? "null" : ts.toEpochMilli()); + this.helper.sendParametersOnThingsboardTelemetry(results.getResultTelemetries(), sessionInfo, null, ts); } } } catch (Exception e) {