Merge pull request #13720 from thingsboard/lwm2mTransport_send_data_to_Core_if_not_change_Value
lwm2m_fix_bug_Fix incorrect ObjectId version resolution in telemetry send without observe
This commit is contained in:
commit
5c38d769ad
@ -21,8 +21,14 @@ import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonElement;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.eclipse.leshan.client.LeshanClient;
|
||||
import org.eclipse.leshan.client.object.Security;
|
||||
import org.eclipse.leshan.client.servers.LwM2mServer;
|
||||
import org.eclipse.leshan.core.ResponseCode;
|
||||
import org.eclipse.leshan.core.request.ContentFormat;
|
||||
import org.eclipse.leshan.core.response.ErrorCallback;
|
||||
import org.eclipse.leshan.core.response.ResponseCallback;
|
||||
import org.eclipse.leshan.core.response.SendResponse;
|
||||
import org.eclipse.leshan.server.registration.Registration;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -73,6 +79,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
|
||||
import org.thingsboard.server.transport.AbstractTransportIntegrationTest;
|
||||
import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient;
|
||||
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
|
||||
import org.thingsboard.server.transport.lwm2m.server.client.ResourceUpdateResult;
|
||||
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
|
||||
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
|
||||
|
||||
@ -82,6 +89,7 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -93,6 +101,7 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
@ -351,7 +360,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
|
||||
getWsClient().waitForReply();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString());
|
||||
this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString(), null);
|
||||
awaitObserveReadAll(1, lwM2MTestClient.getDeviceIdStr());
|
||||
String msg = getWsClient().waitForUpdate();
|
||||
|
||||
@ -422,7 +431,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
|
||||
getWsClient().waitForReply();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
this.createNewClient(security, null, false, endpoint, null, true, device.getId().getId().toString());
|
||||
this.createNewClient(security, null, false, endpoint, null, true, device.getId().getId().toString(), null);
|
||||
awaitObserveReadAll(cntObserve, lwM2MTestClient.getDeviceIdStr());
|
||||
String msg = getWsClient().waitForUpdate();
|
||||
|
||||
@ -543,16 +552,17 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
|
||||
|
||||
public void createNewClient(Security security, Security securityBs, boolean isRpc,
|
||||
String endpoint, String deviceIdStr) throws Exception {
|
||||
this.createNewClient(security, securityBs, isRpc, endpoint, null, false, deviceIdStr);
|
||||
this.createNewClient(security, securityBs, isRpc, endpoint, null, false, deviceIdStr, null);
|
||||
}
|
||||
|
||||
public void createNewClient(Security security, Security securityBs, boolean isRpc,
|
||||
String endpoint, Integer clientDtlsCidLength, String deviceIdStr) throws Exception {
|
||||
this.createNewClient(security, securityBs, isRpc, endpoint, clientDtlsCidLength, false, deviceIdStr);
|
||||
this.createNewClient(security, securityBs, isRpc, endpoint, clientDtlsCidLength, false, deviceIdStr, null);
|
||||
}
|
||||
|
||||
public void createNewClient(Security security, Security securityBs, boolean isRpc,
|
||||
String endpoint, Integer clientDtlsCidLength, boolean queueMode, String deviceIdStr) throws Exception {
|
||||
String endpoint, Integer clientDtlsCidLength, boolean queueMode,
|
||||
String deviceIdStr, Integer value3_0_9) throws Exception {
|
||||
this.clientDestroy(false);
|
||||
lwM2MTestClient = new LwM2MTestClient(this.executor, endpoint, resources);
|
||||
|
||||
@ -560,11 +570,86 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
|
||||
int clientPort = socket.getLocalPort();
|
||||
lwM2MTestClient.init(security, securityBs, clientPort, isRpc,
|
||||
this.defaultLwM2mUplinkMsgHandlerTest, this.clientContextTest,
|
||||
clientDtlsCidLength, queueMode, supportFormatOnly_SenMLJSON_SenMLCBOR);
|
||||
clientDtlsCidLength, queueMode, supportFormatOnly_SenMLJSON_SenMLCBOR, value3_0_9);
|
||||
}
|
||||
lwM2MTestClient.setDeviceIdStr(deviceIdStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test: "/3/0/9" value = 44 (constant); count = 10; send from client to telemetry without observe
|
||||
* @param security
|
||||
* @param deviceCredentials
|
||||
* @param endpoint
|
||||
* @param queueMode
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testConnectionWithoutObserveWithDataReceivedSingleTelemetry(Security security,
|
||||
LwM2MDeviceCredentials deviceCredentials,
|
||||
String endpoint,
|
||||
boolean queueMode) throws Exception {
|
||||
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = getTransportConfiguration(TELEMETRY_WITH_ONE_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
|
||||
DeviceProfile deviceProfile = createLwm2mDeviceProfile("profileFor" + endpoint, transportConfiguration);
|
||||
Device device = createLwm2mDevice(deviceCredentials, endpoint, deviceProfile.getId());
|
||||
|
||||
|
||||
|
||||
SingleEntityFilter sef = new SingleEntityFilter();
|
||||
sef.setSingleEntity(device.getId());
|
||||
LatestValueCmd latestCmd = new LatestValueCmd();
|
||||
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
|
||||
EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
|
||||
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
|
||||
|
||||
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
|
||||
getWsClient().send(cmd);
|
||||
getWsClient().waitForReply();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
|
||||
this.createNewClient(security, null, false, endpoint, null, queueMode, device.getId().getId().toString(), 44);
|
||||
awaitObserveReadAll(1, lwM2MTestClient.getDeviceIdStr());
|
||||
|
||||
LeshanClient leshanClient = lwM2MTestClient.getLeshanClient();
|
||||
Map<String, LwM2mServer> registeredServers = leshanClient.getRegisteredServers();
|
||||
List<String> paths = List.of("/3/0/9");
|
||||
int cntUpdate = 10;
|
||||
int cntLast = cntUpdate;
|
||||
for (final LwM2mServer server : registeredServers.values()) {
|
||||
log.info("Sending Data to {} using {}.", server, ContentFormat.SENML_CBOR);
|
||||
ResponseCallback<SendResponse> responseCallback = (response) -> {
|
||||
if (response.isSuccess())
|
||||
log.warn("Data sent successfully to {} [{}].", server, response.getCode());
|
||||
else
|
||||
log.warn("Send data to {} failed [{}] : {}.", server, response.getCode(),
|
||||
response.getErrorMessage() == null ? "" : response.getErrorMessage());
|
||||
};
|
||||
ErrorCallback errorCallback = (e) -> log.warn("Unable to send data to {}.", server, e);
|
||||
while(cntLast > 0) {
|
||||
leshanClient.getSendService().sendData(server, ContentFormat.SENML_CBOR, paths,
|
||||
2000, responseCallback, errorCallback);
|
||||
cntLast-- ;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
verify(defaultUplinkMsgHandlerTest, timeout(10000).atLeast(cntUpdate))
|
||||
.updateAttrTelemetry(Mockito.any(ResourceUpdateResult.class), eq(null));
|
||||
|
||||
String msg = getWsClient().waitForUpdate();
|
||||
EntityDataUpdate update = JacksonUtil.fromString(msg, EntityDataUpdate.class);
|
||||
Assert.assertEquals(1, update.getCmdId());
|
||||
List<EntityData> eData = update.getUpdate();
|
||||
Assert.assertNotNull(eData);
|
||||
Assert.assertEquals(1, eData.size());
|
||||
Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
|
||||
Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
|
||||
var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
|
||||
assertThat(Long.parseLong(tsValue.getValue()), instanceOf(Long.class));
|
||||
int expected = 44;
|
||||
assertEquals(expected, Long.parseLong(tsValue.getValue()));
|
||||
}
|
||||
|
||||
|
||||
private void clientDestroy(boolean isAfter) {
|
||||
try {
|
||||
if (lwM2MTestClient != null && lwM2MTestClient.getLeshanClient() != null) {
|
||||
|
||||
@ -144,7 +144,7 @@ public class LwM2MTestClient {
|
||||
public void init(Security security, Security securityBs, int port, boolean isRpc,
|
||||
LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandler,
|
||||
LwM2mClientContext clientContext, Integer cIdLength, boolean queueMode,
|
||||
boolean supportFormatOnly_SenMLJSON_SenMLCBOR) throws InvalidDDFFileException, IOException {
|
||||
boolean supportFormatOnly_SenMLJSON_SenMLCBOR, Integer value3_0_9) throws InvalidDDFFileException, IOException {
|
||||
Assert.assertNull("client already initialized", leshanClient);
|
||||
this.defaultLwM2mUplinkMsgHandlerTest = defaultLwM2mUplinkMsgHandler;
|
||||
this.clientContext = clientContext;
|
||||
@ -197,7 +197,7 @@ public class LwM2MTestClient {
|
||||
initializer.setInstancesForObject(SERVER, lwm2mServer);
|
||||
}
|
||||
|
||||
initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor));
|
||||
initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor, value3_0_9));
|
||||
initializer.setInstancesForObject(FIRMWARE, fwLwM2MDevice = new FwLwM2MDevice());
|
||||
initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, swLwM2MDevice = new SwLwM2MDevice());
|
||||
initializer.setClassForObject(ACCESS_CONTROL, DummyInstanceEnabler.class);
|
||||
|
||||
@ -85,18 +85,22 @@ public class SimpleLwM2MDevice extends BaseInstanceEnabler implements Destroyabl
|
||||
*/
|
||||
private static Map<Integer, Long> errorCode =
|
||||
Map.of(0, 0L); // 0-32
|
||||
private Integer value3_0_9;
|
||||
|
||||
public SimpleLwM2MDevice() {
|
||||
}
|
||||
|
||||
public SimpleLwM2MDevice(ScheduledExecutorService executorService) {
|
||||
public SimpleLwM2MDevice(ScheduledExecutorService executorService, Integer value3_0_9) {
|
||||
this.value3_0_9 = value3_0_9;
|
||||
try {
|
||||
executorService.scheduleWithFixedDelay(() -> {
|
||||
fireResourceChange(9);
|
||||
fireResourceChange(20);
|
||||
}
|
||||
, 1, 1, TimeUnit.SECONDS); // 2 sec
|
||||
if ( this.value3_0_9 == null) {
|
||||
executorService.scheduleWithFixedDelay(() -> {
|
||||
fireResourceChange(9);
|
||||
fireResourceChange(20);
|
||||
}
|
||||
, 1, 1, TimeUnit.SECONDS); // 2 sec
|
||||
// , 1800000, 1800000, TimeUnit.MILLISECONDS); // 30 MIN
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("[{}]Throwable", e.toString());
|
||||
e.printStackTrace();
|
||||
@ -211,8 +215,14 @@ public class SimpleLwM2MDevice extends BaseInstanceEnabler implements Destroyabl
|
||||
}
|
||||
|
||||
private int getBatteryLevel() {
|
||||
int valBattery = randomIterator.nextInt();
|
||||
log.trace("Send from client [3/0/9] val: [{}]", valBattery);
|
||||
int valBattery;
|
||||
if (this.value3_0_9 == null) {
|
||||
valBattery = randomIterator.nextInt();
|
||||
log.trace("Send from client [3/0/9] val: [{}]", valBattery);
|
||||
} else {
|
||||
valBattery = this.value3_0_9;
|
||||
log.warn("Send from client [3/0/9] constant value: [{}]", valBattery);
|
||||
}
|
||||
return valBattery;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
package org.thingsboard.server.transport.lwm2m.rpc.sql;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
|
||||
import org.thingsboard.server.transport.lwm2m.security.AbstractSecurityLwM2MIntegrationTest;
|
||||
|
||||
public class RpcLwm2mIntegrationDataReceivedFromClientTest extends AbstractSecurityLwM2MIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testWithNoSecConnectLwm2mSuccessAndObserveTelemetry() throws Exception {
|
||||
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC;
|
||||
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
|
||||
super.testConnectionWithoutObserveWithDataReceivedSingleTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, false);
|
||||
}
|
||||
|
||||
}
|
||||
@ -383,7 +383,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
|
||||
LwM2mPath path = instant.getKey();
|
||||
LwM2mNode node = instant.getValue();
|
||||
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
|
||||
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path.toString(), modelProvider);
|
||||
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(convertObjectIdToVersionedId(path.toString(), lwM2MClient), modelProvider);
|
||||
if (objectModelVersion != null) {
|
||||
ResourceUpdateResult updateResource = new ResourceUpdateResult(lwM2MClient);
|
||||
if (node instanceof LwM2mObject) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user