lwm2m: comments - 1

This commit is contained in:
nickAS21 2025-05-14 17:41:36 +03:00
parent aa1991c7b4
commit cb2ab78186
5 changed files with 47 additions and 37 deletions

View File

@ -29,7 +29,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper;
import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult;
import java.util.List;
import java.util.Set;
@ -259,7 +259,7 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg
.filter(invocation ->
invocation.getMethod().getName().equals("updateAttrTelemetry") &&
invocation.getArguments().length > 1 &&
((ResultUpdateResource)invocation.getArguments()[0]).getPaths().toString().contains(idVerRez)
((ResourceUpdateResult)invocation.getArguments()[0]).getPaths().toString().contains(idVerRez)
)
.count();
}

View File

@ -24,7 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationTest;
import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult;
import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL;
import static org.junit.Assert.assertEquals;
@ -80,7 +80,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT
int cntUpdate = 3;
verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate))
.updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null));
.updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null));
}
/**
@ -95,7 +95,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT
int cntUpdate = 3;
verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate))
.updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null));
.updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null));
}
/**
@ -328,7 +328,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationT
int cntUpdate = 10;
verify(defaultUplinkMsgHandlerTest, timeout(50000).atLeast(cntUpdate))
.updateAttrTelemetry(Mockito.any(ResultUpdateResource.class), eq(null));
.updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null));
}
private void sendRpcObserveWithWithTwoResource(String expectedId_1, String expectedId_2) throws Exception {

View File

@ -40,7 +40,7 @@ public enum TelemetryObserveStrategy {
return strategy;
}
}
return null;
throw new IllegalArgumentException("Unknown TelemetryObserveStrategy id: " + description);
}
public static TelemetryObserveStrategy fromId(int id) {

View File

@ -23,11 +23,11 @@ import java.util.Set;
@Data
@AllArgsConstructor
public class ResultUpdateResource {
LwM2mClient lwM2MClient;
Set<String> paths;
public class ResourceUpdateResult {
private LwM2mClient lwM2MClient;
private Set<String> paths;
public ResultUpdateResource(LwM2mClient lwM2MClient) {
public ResourceUpdateResult(LwM2mClient lwM2MClient) {
this.lwM2MClient = lwM2MClient;
this.paths = new HashSet<>();
}

View File

@ -78,7 +78,7 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientStateException;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import org.thingsboard.server.transport.lwm2m.server.client.ResultUpdateResource;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult;
import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto;
import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService;
import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
@ -124,7 +124,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_ALL;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_BY_OBJECT;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.SINGLE;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
@ -324,7 +323,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path, modelProvider);
if (objectModelVersion != null) {
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
int responseCode = response.getCode().getCode();
if (content instanceof LwM2mObject) {
this.updateObjectResourceValue(updateResource, (LwM2mObject) content, path, responseCode);
@ -343,7 +342,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
log.trace("ReadCompositeResponse before onUpdateValueAfterReadCompositeResponse: [{}]", response);
if (response.getContent() != null) {
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
response.getContent().forEach((k, v) -> {
if (v != null) {
int responseCode = response.getCode().getCode();
@ -386,7 +385,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path.toString(), modelProvider);
if (objectModelVersion != null) {
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
if (node instanceof LwM2mObject) {
this.updateObjectResourceValue(updateResource, (LwM2mObject) node, path.toString(), 0);
} else if (node instanceof LwM2mObjectInstance) {
@ -518,19 +517,30 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());
if (!targetIds.isEmpty()) {
TelemetryObserveStrategy observeStrategy = profile.getObserveAttr().getObserveStrategy();
if (SINGLE.equals(observeStrategy)) {
CountDownLatch latch = new CountDownLatch(targetIds.size());
targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId,
new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId))));
latch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
} else if (COMPOSITE_ALL.equals(observeStrategy)) {
String[] versionedIds = targetIds.toArray(new String[0]);
sendObserveCompositeRequest(lwM2MClient, versionedIds);
} else if (COMPOSITE_BY_OBJECT.equals(observeStrategy)) {
Map<Integer, String[]> versionedObjectIds = groupByObjectIdVersionedIds(targetIds);
CountDownLatch latch = new CountDownLatch(versionedObjectIds.size());
versionedObjectIds.forEach((k, v)-> sendObserveCompositeRequest(lwM2MClient, v));
latch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
long timeoutMs = config.getTimeout();
switch (observeStrategy) {
case SINGLE -> {
CountDownLatch latch = new CountDownLatch(targetIds.size());
targetIds.forEach(targetId -> sendObserveRequest(
lwM2MClient, targetId,
new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId))
));
boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!completed) log.trace("[{}] Timeout occurred during SINGLE observe init", lwM2MClient.getEndpoint());
}
case COMPOSITE_ALL -> {
CountDownLatch latch = new CountDownLatch(targetIds.size());
sendObserveCompositeRequest(lwM2MClient, targetIds.toArray(new String[0]));
boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!completed) log.trace("[{}] Timeout occurred during COMPOSITE_ALL observe init", lwM2MClient.getEndpoint());
}
case COMPOSITE_BY_OBJECT -> {
Map<Integer, String[]> versionedObjectIds = groupByObjectIdVersionedIds(targetIds);
CountDownLatch latch = new CountDownLatch(versionedObjectIds.size());
versionedObjectIds.forEach((k, v) -> sendObserveCompositeRequest(lwM2MClient, v));
boolean completed = latch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!completed) log.trace("[{}] Timeout occurred during COMPOSITE_BY_OBJECT observe init", lwM2MClient.getEndpoint());
}
}
}
} catch (InterruptedException e) {
@ -586,7 +596,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
defaultLwM2MDownlinkMsgHandler.sendCancelObserveRequest(client, request, new TbLwM2MCancelObserveCallback(logService, client, versionedId));
}
private void updateObjectResourceValue(ResultUpdateResource updateResource, LwM2mObject lwM2mObject, String pathIdVer, int code) {
private void updateObjectResourceValue(ResourceUpdateResult updateResource, LwM2mObject lwM2mObject, String pathIdVer, int code) {
LwM2mPath pathIds = new LwM2mPath(fromVersionedIdToObjectId(pathIdVer));
lwM2mObject.getInstances().forEach((instanceId, instance) -> {
String pathInstance = pathIds.toString() + "/" + instanceId;
@ -594,7 +604,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
});
}
private void updateObjectInstanceResourceValue(ResultUpdateResource updateResource, LwM2mObjectInstance lwM2mObjectInstance, String pathIdVer, int code) {
private void updateObjectInstanceResourceValue(ResourceUpdateResult updateResource, LwM2mObjectInstance lwM2mObjectInstance, String pathIdVer, int code) {
lwM2mObjectInstance.getResources().forEach((resourceId, resource) -> {
String pathRez = pathIdVer + "/" + resourceId;
this.updateResourcesValue(updateResource, resource, pathRez, Mode.UPDATE, code);
@ -612,7 +622,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
* @param stringPath - resource
* @param mode - Replace, Update
*/
private void updateResourcesValue(ResultUpdateResource updateResource, LwM2mResource lwM2mResource, String stringPath, Mode mode, int code) {
private void updateResourcesValue(ResourceUpdateResult updateResource, LwM2mResource lwM2mResource, String stringPath, Mode mode, int code) {
LwM2mClient lwM2MClient = updateResource.getLwM2MClient();
String path = convertObjectIdToVersionedId(stringPath, lwM2MClient);
if (path != null && lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider, mode)) {
@ -660,7 +670,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
*
* @param updateResource - updateResource resource of LwM2M Client
*/
public void updateAttrTelemetry(ResultUpdateResource updateResource, Instant ts) {
public void updateAttrTelemetry(ResourceUpdateResult updateResource, Instant ts) {
log.trace("UpdateAttrTelemetry paths [{}]", updateResource.getPaths());
try {
ResultsAddKeyValueProto results = this.getParametersFromProfile(updateResource);
@ -707,7 +717,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
*
* @param updateResource - updateResource resource of LwM2M Client
*/
private ResultsAddKeyValueProto getParametersFromProfile(ResultUpdateResource updateResource) {
private ResultsAddKeyValueProto getParametersFromProfile(ResourceUpdateResult updateResource) {
Registration registration = updateResource.getLwM2MClient().getRegistration();
Set<String> paths = updateResource.getPaths();
ResultsAddKeyValueProto results = new ResultsAddKeyValueProto();
@ -823,7 +833,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
@Override
public void onWriteResponseOk(LwM2mClient lwM2MClient, String path, WriteRequest request, int code) {
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
if (request.getNode() instanceof LwM2mResource) {
this.updateResourcesValue(updateResource, ((LwM2mResource) request.getNode()), path, request.isReplaceRequest() ? Mode.REPLACE : Mode.UPDATE, code);
} else if (request.getNode() instanceof LwM2mObjectInstance) {
@ -840,7 +850,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
@Override
public void onCreatebjectInstancesResponseOk(LwM2mClient lwM2MClient, String versionId, CreateRequest request) {
if (request.getObjectInstances() != null && !request.getObjectInstances().isEmpty()) {
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
request.getObjectInstances().forEach(instance ->
instance.getResources().forEach((resId, lwM2mResource) ->{
this.updateResourcesValue(updateResource, lwM2mResource, versionId + "/" + resId, Mode.REPLACE, 0);
@ -854,7 +864,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
@Override
public void onWriteCompositeResponseOk(LwM2mClient lwM2MClient, WriteCompositeRequest request, int code) {
log.trace("ReadCompositeResponse: [{}]", request.getNodes());
ResultUpdateResource updateResource = new ResultUpdateResource(lwM2MClient);
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
request.getNodes().forEach((k, v) -> {
if (v instanceof LwM2mSingleResource) {
this.updateResourcesValue(updateResource, (LwM2mResource) v, k.toString(), Mode.REPLACE, code);