From cb2ab78186f3a13e4408a0dccd3b0b07e3ab5bbf Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Wed, 14 May 2025 17:41:36 +0300 Subject: [PATCH] lwm2m: comments - 1 --- .../rpc/AbstractRpcLwM2MIntegrationTest.java | 4 +- .../sql/RpcLwm2mIntegrationObserveTest.java | 8 +-- .../lwm2m/TelemetryObserveStrategy.java | 2 +- ...esource.java => ResourceUpdateResult.java} | 8 +-- .../uplink/DefaultLwM2mUplinkMsgHandler.java | 62 +++++++++++-------- 5 files changed, 47 insertions(+), 37 deletions(-) rename common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/{ResultUpdateResource.java => ResourceUpdateResult.java} (84%) diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java index fe571eb9b3..be85294f08 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/AbstractRpcLwM2MIntegrationTest.java @@ -29,7 +29,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper; -import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource; +import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult; import java.util.List; import java.util.Set; @@ -259,7 +259,7 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg .filter(invocation -> invocation.getMethod().getName().equals("updateAttrTelemetry") && invocation.getArguments().length > 1 && - ((ResultUpdateResource)invocation.getArguments()[0]).getPaths().toString().contains(idVerRez) + ((ResourceUpdateResult)invocation.getArguments()[0]).getPaths().toString().contains(idVerRez) ) .count(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java index 1ba89720ae..483442382d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java @@ -24,7 +24,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationTest; -import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource; +import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult; import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL; import static org.junit.Assert.assertEquals; @@ -80,7 +80,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT int cntUpdate = 3; verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate)) - .updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null)); + .updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null)); } /** @@ -95,7 +95,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT int cntUpdate = 3; verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate)) - .updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null)); + .updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null)); } /** @@ -328,7 +328,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT int cntUpdate = 10; verify(defaultUplinkMsgHandlerTest, timeout(50000).atLeast(cntUpdate)) - .updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null)); + .updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null)); } private void sendRpcObserveWithWithTwoResource(String expectedId_1, String expectedId_2) throws Exception { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/lwm2m/TelemetryObserveStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/lwm2m/TelemetryObserveStrategy.java index 264d883353..c3e525633f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/lwm2m/TelemetryObserveStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/lwm2m/TelemetryObserveStrategy.java @@ -40,7 +40,7 @@ public enum TelemetryObserveStrategy { return strategy; } } - return null; + throw new IllegalArgumentException("Unknown TelemetryObserveStrategy id: " + description); } public static TelemetryObserveStrategy fromId(int id) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResultUpdateResource.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResourceUpdateResult.java similarity index 84% rename from common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResultUpdateResource.java rename to common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResourceUpdateResult.java index ffbf4595fb..0a47d211b5 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResultUpdateResource.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/ResourceUpdateResult.java @@ -23,11 +23,11 @@ import java.util.Set; @Data @AllArgsConstructor -public class ResultUpdateResource { - LwM2mClient lwM2MClient; - Set paths; +public class ResourceUpdateResult { + private LwM2mClient lwM2MClient; + private Set paths; - public ResultUpdateResource(LwM2mClient lwM2MClient) { + public ResourceUpdateResult(LwM2mClient lwM2MClient) { this.lwM2MClient = lwM2MClient; this.paths = new HashSet<>(); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java index 5897533539..37f7829c12 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2mUplinkMsgHandler.java @@ -78,7 +78,7 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState; import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientStateException; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; -import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource; +import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult; import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto; import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService; import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; @@ -124,7 +124,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_ALL; import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_BY_OBJECT; import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.SINGLE; import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH; @@ -324,7 +323,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint()); ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path, modelProvider); if (objectModelVersion != null) { - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); int responseCode = response.getCode().getCode(); if (content instanceof LwM2mObject) { this.updateObjectResourceValue(updateResource, (LwM2mObject) content, path, responseCode); @@ -343,7 +342,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl log.trace("ReadCompositeResponse before onUpdateValueAfterReadCompositeResponse: [{}]", response); if (response.getContent() != null) { LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint()); - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); response.getContent().forEach((k, v) -> { if (v != null) { int responseCode = response.getCode().getCode(); @@ -386,7 +385,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint()); ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path.toString(), modelProvider); if (objectModelVersion != null) { - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); if (node instanceof LwM2mObject) { this.updateObjectResourceValue(updateResource, (LwM2mObject) node, path.toString(), 0); } else if (node instanceof LwM2mObjectInstance) { @@ -518,19 +517,30 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet()); if (!targetIds.isEmpty()) { TelemetryObserveStrategy observeStrategy = profile.getObserveAttr().getObserveStrategy(); - if (SINGLE.equals(observeStrategy)) { - CountDownLatch latch = new CountDownLatch(targetIds.size()); - targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId, - new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId)))); - latch.await(config.getTimeout(), TimeUnit.MILLISECONDS); - } else if (COMPOSITE_ALL.equals(observeStrategy)) { - String[] versionedIds = targetIds.toArray(new String[0]); - sendObserveCompositeRequest(lwM2MClient, versionedIds); - } else if (COMPOSITE_BY_OBJECT.equals(observeStrategy)) { - Map versionedObjectIds = groupByObjectIdVersionedIds(targetIds); - CountDownLatch latch = new CountDownLatch(versionedObjectIds.size()); - versionedObjectIds.forEach((k, v)-> sendObserveCompositeRequest(lwM2MClient, v)); - latch.await(config.getTimeout(), TimeUnit.MILLISECONDS); + long timeoutMs = config.getTimeout(); + switch (observeStrategy) { + case SINGLE -> { + CountDownLatch latch = new CountDownLatch(targetIds.size()); + targetIds.forEach(targetId -> sendObserveRequest( + lwM2MClient, targetId, + new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId)) + )); + boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (!completed) log.trace("[{}] Timeout occurred during SINGLE observe init", lwM2MClient.getEndpoint()); + } + case COMPOSITE_ALL -> { + CountDownLatch latch = new CountDownLatch(targetIds.size()); + sendObserveCompositeRequest(lwM2MClient, targetIds.toArray(new String[0])); + boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (!completed) log.trace("[{}] Timeout occurred during COMPOSITE_ALL observe init", lwM2MClient.getEndpoint()); + } + case COMPOSITE_BY_OBJECT -> { + Map versionedObjectIds = groupByObjectIdVersionedIds(targetIds); + CountDownLatch latch = new CountDownLatch(versionedObjectIds.size()); + versionedObjectIds.forEach((k, v) -> sendObserveCompositeRequest(lwM2MClient, v)); + boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (!completed) log.trace("[{}] Timeout occurred during COMPOSITE_BY_OBJECT observe init", lwM2MClient.getEndpoint()); + } } } } catch (InterruptedException e) { @@ -586,7 +596,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl defaultLwM2MDownlinkMsgHandler.sendCancelObserveRequest(client, request, new TbLwM2MCancelObserveCallback(logService, client, versionedId)); } - private void updateObjectResourceValue(ResultUpdateResource updateResource, LwM2mObject lwM2mObject, String pathIdVer, int code) { + private void updateObjectResourceValue(ResourceUpdateResult updateResource, LwM2mObject lwM2mObject, String pathIdVer, int code) { LwM2mPath pathIds = new LwM2mPath(fromVersionedIdToObjectId(pathIdVer)); lwM2mObject.getInstances().forEach((instanceId, instance) -> { String pathInstance = pathIds.toString() + "/" + instanceId; @@ -594,7 +604,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl }); } - private void updateObjectInstanceResourceValue(ResultUpdateResource updateResource, LwM2mObjectInstance lwM2mObjectInstance, String pathIdVer, int code) { + private void updateObjectInstanceResourceValue(ResourceUpdateResult updateResource, LwM2mObjectInstance lwM2mObjectInstance, String pathIdVer, int code) { lwM2mObjectInstance.getResources().forEach((resourceId, resource) -> { String pathRez = pathIdVer + "/" + resourceId; this.updateResourcesValue(updateResource, resource, pathRez, Mode.UPDATE, code); @@ -612,7 +622,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl * @param stringPath - resource * @param mode - Replace, Update */ - private void updateResourcesValue(ResultUpdateResource updateResource, LwM2mResource lwM2mResource, String stringPath, Mode mode, int code) { + private void updateResourcesValue(ResourceUpdateResult updateResource, LwM2mResource lwM2mResource, String stringPath, Mode mode, int code) { LwM2mClient lwM2MClient = updateResource.getLwM2MClient(); String path = convertObjectIdToVersionedId(stringPath, lwM2MClient); if (path != null && lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider, mode)) { @@ -660,7 +670,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl * * @param updateResource - updateResource resource of LwM2M Client */ - public void updateAttrTelemetry(ResultUpdateResource updateResource, Instant ts) { + public void updateAttrTelemetry(ResourceUpdateResult updateResource, Instant ts) { log.trace("UpdateAttrTelemetry paths [{}]", updateResource.getPaths()); try { ResultsAddKeyValueProto results = this.getParametersFromProfile(updateResource); @@ -707,7 +717,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl * * @param updateResource - updateResource resource of LwM2M Client */ - private ResultsAddKeyValueProto getParametersFromProfile(ResultUpdateResource updateResource) { + private ResultsAddKeyValueProto getParametersFromProfile(ResourceUpdateResult updateResource) { Registration registration = updateResource.getLwM2MClient().getRegistration(); Set paths = updateResource.getPaths(); ResultsAddKeyValueProto results = new ResultsAddKeyValueProto(); @@ -823,7 +833,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl @Override public void onWriteResponseOk(LwM2mClient lwM2MClient, String path, WriteRequest request, int code) { - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); if (request.getNode() instanceof LwM2mResource) { this.updateResourcesValue(updateResource, ((LwM2mResource) request.getNode()), path, request.isReplaceRequest() ? Mode.REPLACE : Mode.UPDATE, code); } else if (request.getNode() instanceof LwM2mObjectInstance) { @@ -840,7 +850,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl @Override public void onCreatebjectInstancesResponseOk(LwM2mClient lwM2MClient, String versionId, CreateRequest request) { if (request.getObjectInstances() != null && !request.getObjectInstances().isEmpty()) { - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); request.getObjectInstances().forEach(instance -> instance.getResources().forEach((resId, lwM2mResource) ->{ this.updateResourcesValue(updateResource, lwM2mResource, versionId + "/" + resId, Mode.REPLACE, 0); @@ -854,7 +864,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl @Override public void onWriteCompositeResponseOk(LwM2mClient lwM2MClient, WriteCompositeRequest request, int code) { log.trace("ReadCompositeResponse: [{}]", request.getNodes()); - ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient); + ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); request.getNodes().forEach((k, v) -> { if (v instanceof LwM2mSingleResource) { this.updateResourcesValue(updateResource, (LwM2mResource) v, k.toString(), Mode.REPLACE, code);