lwm2m: fix bug in onUpdateValueWithSendRequest convertObjectIdToVersionedId

This commit is contained in:
nickAS21 2025-07-15 18:22:40 +03:00
parent 8b9fbdcec0
commit 8b86fdc1dc
5 changed files with 128 additions and 17 deletions

View File

@ -21,8 +21,14 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.eclipse.leshan.client.LeshanClient;
import org.eclipse.leshan.client.object.Security; import org.eclipse.leshan.client.object.Security;
import org.eclipse.leshan.client.servers.LwM2mServer;
import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.response.SendResponse;
import org.eclipse.leshan.server.registration.Registration; import org.eclipse.leshan.server.registration.Registration;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -73,6 +79,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.transport.AbstractTransportIntegrationTest; import org.thingsboard.server.transport.AbstractTransportIntegrationTest;
import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient; import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
@ -82,6 +89,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -93,6 +101,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -351,7 +360,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
getWsClient().waitForReply(); getWsClient().waitForReply();
getWsClient().registerWaitForUpdate(); getWsClient().registerWaitForUpdate();
this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString()); this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString(), null);
awaitObserveReadAll(1, lwM2MTestClient.getDeviceIdStr()); awaitObserveReadAll(1, lwM2MTestClient.getDeviceIdStr());
String msg = getWsClient().waitForUpdate(); String msg = getWsClient().waitForUpdate();
@ -422,7 +431,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
getWsClient().waitForReply(); getWsClient().waitForReply();
getWsClient().registerWaitForUpdate(); getWsClient().registerWaitForUpdate();
this.createNewClient(security, null, false, endpoint, null, true, device.getId().getId().toString()); this.createNewClient(security, null, false, endpoint, null, true, device.getId().getId().toString(), null);
awaitObserveReadAll(cntObserve, lwM2MTestClient.getDeviceIdStr()); awaitObserveReadAll(cntObserve, lwM2MTestClient.getDeviceIdStr());
String msg = getWsClient().waitForUpdate(); String msg = getWsClient().waitForUpdate();
@ -543,16 +552,17 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
public void createNewClient(Security security, Security securityBs, boolean isRpc, public void createNewClient(Security security, Security securityBs, boolean isRpc,
String endpoint, String deviceIdStr) throws Exception { String endpoint, String deviceIdStr) throws Exception {
this.createNewClient(security, securityBs, isRpc, endpoint, null, false, deviceIdStr); this.createNewClient(security, securityBs, isRpc, endpoint, null, false, deviceIdStr, null);
} }
public void createNewClient(Security security, Security securityBs, boolean isRpc, public void createNewClient(Security security, Security securityBs, boolean isRpc,
String endpoint, Integer clientDtlsCidLength, String deviceIdStr) throws Exception { String endpoint, Integer clientDtlsCidLength, String deviceIdStr) throws Exception {
this.createNewClient(security, securityBs, isRpc, endpoint, clientDtlsCidLength, false, deviceIdStr); this.createNewClient(security, securityBs, isRpc, endpoint, clientDtlsCidLength, false, deviceIdStr, null);
} }
public void createNewClient(Security security, Security securityBs, boolean isRpc, public void createNewClient(Security security, Security securityBs, boolean isRpc,
String endpoint, Integer clientDtlsCidLength, boolean queueMode, String deviceIdStr) throws Exception { String endpoint, Integer clientDtlsCidLength, boolean queueMode,
String deviceIdStr, Integer value3_0_9) throws Exception {
this.clientDestroy(false); this.clientDestroy(false);
lwM2MTestClient = new LwM2MTestClient(this.executor, endpoint, resources); lwM2MTestClient = new LwM2MTestClient(this.executor, endpoint, resources);
@ -560,11 +570,86 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
int clientPort = socket.getLocalPort(); int clientPort = socket.getLocalPort();
lwM2MTestClient.init(security, securityBs, clientPort, isRpc, lwM2MTestClient.init(security, securityBs, clientPort, isRpc,
this.defaultLwM2mUplinkMsgHandlerTest, this.clientContextTest, this.defaultLwM2mUplinkMsgHandlerTest, this.clientContextTest,
clientDtlsCidLength, queueMode, supportFormatOnly_SenMLJSON_SenMLCBOR); clientDtlsCidLength, queueMode, supportFormatOnly_SenMLJSON_SenMLCBOR, value3_0_9);
} }
lwM2MTestClient.setDeviceIdStr(deviceIdStr); lwM2MTestClient.setDeviceIdStr(deviceIdStr);
} }
/**
* Test: "/3/0/9" value = 44 (constant); count = 10; send from client to telemetry without observe
* @param security
* @param deviceCredentials
* @param endpoint
* @param queueMode
* @throws Exception
*/
public void testConnectionWithoutObserveWithDataReceivedSingleTelemetry(Security security,
LwM2MDeviceCredentials deviceCredentials,
String endpoint,
boolean queueMode) throws Exception {
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = getTransportConfiguration(TELEMETRY_WITH_ONE_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
DeviceProfile deviceProfile = createLwm2mDeviceProfile("profileFor" + endpoint, transportConfiguration);
Device device = createLwm2mDevice(deviceCredentials, endpoint, deviceProfile.getId());
SingleEntityFilter sef = new SingleEntityFilter();
sef.setSingleEntity(device.getId());
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
getWsClient().send(cmd);
getWsClient().waitForReply();
getWsClient().registerWaitForUpdate();
this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString(), 44);
awaitObserveReadAll(1, lwM2MTestClient.getDeviceIdStr());
LeshanClient leshanClient = lwM2MTestClient.getLeshanClient();
Map<String, LwM2mServer> registeredServers = leshanClient.getRegisteredServers();
List<String> paths = List.of("/3/0/9");
int cntUpdate = 10;
int cntLast = cntUpdate;
for (final LwM2mServer server : registeredServers.values()) {
log.info("Sending Data to {} using {}.", server, ContentFormat.SENML_CBOR);
ResponseCallback<SendResponse> responseCallback = (response) -> {
if (response.isSuccess())
log.warn("Data sent successfully to {} [{}].", server, response.getCode());
else
log.warn("Send data to {} failed [{}] : {}.", server, response.getCode(),
response.getErrorMessage() == null ? "" : response.getErrorMessage());
};
ErrorCallback errorCallback = (e) -> log.warn("Unable to send data to {}.", server, e);
while(cntLast > 0) {
leshanClient.getSendService().sendData(server, ContentFormat.SENML_CBOR, paths,
2000, responseCallback, errorCallback);
cntLast-- ;
}
}
verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate))
.updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null));
String msg = getWsClient().waitForUpdate();
EntityDataUpdate update = JacksonUtil.fromString(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
List<EntityData> eData = update.getUpdate();
Assert.assertNotNull(eData);
Assert.assertEquals(1, eData.size());
Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
assertThat(Long.parseLong(tsValue.getValue()), instanceOf(Long.class));
int expected = 44;
assertEquals(expected, Long.parseLong(tsValue.getValue()));
}
private void clientDestroy(boolean isAfter) { private void clientDestroy(boolean isAfter) {
try { try {
if (lwM2MTestClient != null && lwM2MTestClient.getLeshanClient() != null) { if (lwM2MTestClient != null && lwM2MTestClient.getLeshanClient() != null) {

View File

@ -144,7 +144,7 @@ public class LwM2MTestClient {
public void init(Security security, Security securityBs, int port, boolean isRpc, public void init(Security security, Security securityBs, int port, boolean isRpc,
LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandler, LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandler,
LwM2mClientContext clientContext, Integer cIdLength, boolean queueMode, LwM2mClientContext clientContext, Integer cIdLength, boolean queueMode,
boolean supportFormatOnly_SenMLJSON_SenMLCBOR) throws InvalidDDFFileException, IOException { boolean supportFormatOnly_SenMLJSON_SenMLCBOR, Integer value3_0_9) throws InvalidDDFFileException, IOException {
Assert.assertNull("client already initialized", leshanClient); Assert.assertNull("client already initialized", leshanClient);
this.defaultLwM2mUplinkMsgHandlerTest = defaultLwM2mUplinkMsgHandler; this.defaultLwM2mUplinkMsgHandlerTest = defaultLwM2mUplinkMsgHandler;
this.clientContext = clientContext; this.clientContext = clientContext;
@ -197,7 +197,7 @@ public class LwM2MTestClient {
initializer.setInstancesForObject(SERVER, lwm2mServer); initializer.setInstancesForObject(SERVER, lwm2mServer);
} }
initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor)); initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor, value3_0_9));
initializer.setInstancesForObject(FIRMWARE, fwLwM2MDevice = new FwLwM2MDevice()); initializer.setInstancesForObject(FIRMWARE, fwLwM2MDevice = new FwLwM2MDevice());
initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, swLwM2MDevice = new SwLwM2MDevice()); initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, swLwM2MDevice = new SwLwM2MDevice());
initializer.setClassForObject(ACCESS_CONTROL, DummyInstanceEnabler.class); initializer.setClassForObject(ACCESS_CONTROL, DummyInstanceEnabler.class);

View File

@ -85,18 +85,22 @@ public class SimpleLwM2MDevice extends BaseInstanceEnabler implements Destroyabl
*/ */
private static Map<Integer, Long> errorCode = private static Map<Integer, Long> errorCode =
Map.of(0, 0L); // 0-32 Map.of(0, 0L); // 0-32
private Integer value3_0_9;
public SimpleLwM2MDevice() { public SimpleLwM2MDevice() {
} }
public SimpleLwM2MDevice(ScheduledExecutorService executorService) { public SimpleLwM2MDevice(ScheduledExecutorService executorService, Integer value3_0_9) {
this.value3_0_9 = value3_0_9;
try { try {
if ( this.value3_0_9 == null) {
executorService.scheduleWithFixedDelay(() -> { executorService.scheduleWithFixedDelay(() -> {
fireResourceChange(9); fireResourceChange(9);
fireResourceChange(20); fireResourceChange(20);
} }
, 1, 1, TimeUnit.SECONDS); // 2 sec , 1, 1, TimeUnit.SECONDS); // 2 sec
// , 1800000, 1800000, TimeUnit.MILLISECONDS); // 30 MIN // , 1800000, 1800000, TimeUnit.MILLISECONDS); // 30 MIN
}
} catch (Throwable e) { } catch (Throwable e) {
log.error("[{}]Throwable", e.toString()); log.error("[{}]Throwable", e.toString());
e.printStackTrace(); e.printStackTrace();
@ -211,8 +215,14 @@ public class SimpleLwM2MDevice extends BaseInstanceEnabler implements Destroyabl
} }
private int getBatteryLevel() { private int getBatteryLevel() {
int valBattery = randomIterator.nextInt(); int valBattery;
if (this.value3_0_9 == null) {
valBattery = randomIterator.nextInt();
log.trace("Send from client [3/0/9] val: [{}]", valBattery); log.trace("Send from client [3/0/9] val: [{}]", valBattery);
} else {
valBattery = this.value3_0_9;
log.warn("Send from client [3/0/9] constant value: [{}]", valBattery);
}
return valBattery; return valBattery;
} }

View File

@ -0,0 +1,16 @@
package org.thingsboard.server.transport.lwm2m.rpc.sql;
import org.junit.Test;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
import org.thingsboard.server.transport.lwm2m.security.AbstractSecurityLwM2MIntegrationTest;
public class RpcLwm2mIntegrationDataReceivedFromClientTest extends AbstractSecurityLwM2MIntegrationTest {
@Test
public void testWithNoSecConnectLwm2mSuccessAndObserveTelemetry() throws Exception {
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC;
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
super.testConnectionWithoutObserveWithDataReceivedSingleTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, false);
}
}

View File

@ -383,7 +383,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2mPath path = instant.getKey(); LwM2mPath path = instant.getKey();
LwM2mNode node = instant.getValue(); LwM2mNode node = instant.getValue();
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint()); LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path.toString(), modelProvider); ObjectModel objectModelVersion = lwM2MClient.getObjectModel(convertObjectIdToVersionedId(path.toString(), lwM2MClient), modelProvider);
if (objectModelVersion != null) { if (objectModelVersion != null) {
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient); ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
if (node instanceof LwM2mObject) { if (node instanceof LwM2mObject) {