lwm2m: add observeComposite ti init Client

This commit is contained in:
nickAS21 2025-04-29 19:06:29 +03:00
parent b9706295ae
commit cff17821c5
6 changed files with 143 additions and 37 deletions

View File

@ -34,6 +34,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.script.api.tbel.TbDate;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
@ -196,6 +197,28 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
" \"attributeLwm2m\": {}\n" +
" }";
public static String TELEMETRY_WITH_COMPOSITE_OBSERVE =
" {\n" +
" \"keyName\": {\n" +
" \"/3_1.2/0/9\": \"batteryLevel\",\n" +
" \"/3_1.2/0/20\": \"batteryStatus\",\n" +
" \"/19_1.1/0/2\": \"dataCreationTime\"\n" +
" },\n" +
" \"observe\": [\n" +
" \"/3_1.2/0/9\",\n" +
" \"/3_1.2/0/20\",\n" +
" \"/19_1.1/0/2\"\n" +
" ],\n" +
" \"attribute\": [],\n" +
" \"telemetry\": [\n" +
" \"/3_1.2/0/9\",\n" +
" \"/3_1.2/0/20\",\n" +
" \"/19_1.1/0/2\"\n" +
" ],\n" +
" \"attributeLwm2m\": {},\n" +
" \"observeStrategy\": 1\n" +
" }";
public static final String CLIENT_LWM2M_SETTINGS =
" {\n" +
" \"edrxCycle\": null,\n" +
@ -208,20 +231,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
" \"pagingTransmissionWindow\": null,\n" +
" \"clientOnlyObserveAfterConnect\": 1\n" +
" }";
public static String TELEMETRY_WITH_STRATEGY_COMPOSITE_ALL =
" {\n" +
" \"keyName\": {\n" +
" \"/3_1.2/0/9\": \"batteryLevel\"\n" +
" },\n" +
" \"observe\": [],\n" +
" \"attribute\": [\n" +
" ],\n" +
" \"telemetry\": [\n" +
" \"/3_1.2/0/9\"\n" +
" ],\n" +
" \"attributeLwm2m\": {},\n" +
" \"observeStrategy\": 1\n" +
" }";
protected final Set<Lwm2mTestHelper.LwM2MClientState> expectedStatusesRegistrationLwm2mSuccess = new HashSet<>(Arrays.asList(ON_INIT, ON_REGISTRATION_STARTED, ON_REGISTRATION_SUCCESS));
protected final Set<Lwm2mTestHelper.LwM2MClientState> expectedStatusesRegistrationLwm2mSuccessUpdate = new HashSet<>(Arrays.asList(ON_INIT, ON_REGISTRATION_STARTED, ON_REGISTRATION_SUCCESS, ON_UPDATE_STARTED, ON_UPDATE_SUCCESS));
protected final Set<Lwm2mTestHelper.LwM2MClientState> expectedStatusesRegistrationBsSuccess = new HashSet<>(Arrays.asList(ON_BOOTSTRAP_STARTED, ON_BOOTSTRAP_SUCCESS, ON_REGISTRATION_STARTED, ON_REGISTRATION_SUCCESS));
@ -261,10 +271,10 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
}
}
public void basicTestConnectionObserveTelemetry(Security security,
LwM2MDeviceCredentials deviceCredentials,
String endpoint,
boolean queueMode) throws Exception {
public void basicTestConnectionObserveSingleTelemetry(Security security,
LwM2MDeviceCredentials deviceCredentials,
String endpoint,
boolean queueMode) throws Exception {
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = getTransportConfiguration(TELEMETRY_WITHOUT_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
DeviceProfile deviceProfile = createLwm2mDeviceProfile("profileFor" + endpoint, transportConfiguration);
Device device = createLwm2mDevice(deviceCredentials, endpoint, deviceProfile.getId());
@ -298,8 +308,56 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
int expectedMin = 5;
Assert.assertTrue(expectedMax >= Long.parseLong(tsValue.getValue()));
Assert.assertTrue(expectedMin <= Long.parseLong(tsValue.getValue()));
}
public void basicTestConnectionObserveCompositeTelemetry(Security security,
LwM2MDeviceCredentials deviceCredentials,
String endpoint,
Lwm2mDeviceProfileTransportConfiguration transportConfiguration,
int cntObserve) throws Exception {
DeviceProfile deviceProfile = createLwm2mDeviceProfile("profileFor" + endpoint, transportConfiguration);
Device device = createLwm2mDevice(deviceCredentials, endpoint, deviceProfile.getId());
SingleEntityFilter sef = new SingleEntityFilter();
sef.setSingleEntity(device.getId());
LatestValueCmd latestCmd = new LatestValueCmd();
String key1 = "batteryLevel";
String key2 = "dataCreationTime";
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, key1)));
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, key2)));
EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataCmd cmd = new EntityDataCmd(2, edq, null, latestCmd, null);
getWsClient().send(cmd);
getWsClient().waitForReply();
getWsClient().registerWaitForUpdate();
this.createNewClient(security, null, false, endpoint, null, true, device.getId().getId().toString());
awaitObserveReadAll(cntObserve, lwM2MTestClient.getDeviceIdStr());
String msg = getWsClient().waitForUpdate();
EntityDataUpdate update = JacksonUtil.fromString(msg, EntityDataUpdate.class);
Assert.assertEquals(2, update.getCmdId());
List<EntityData> eData = update.getUpdate();
Assert.assertNotNull(eData);
Assert.assertEquals(1, eData.size());
Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
var tsValue1 = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get(key1);
var tsValue2 = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get(key2);
if (tsValue1 != null) {
Assert.assertThat(Long.parseLong(tsValue1.getValue()), instanceOf(Long.class));
int expectedMax = 50;
int expectedMin = 5;
Assert.assertTrue(expectedMax >= Long.parseLong(tsValue1.getValue()));
Assert.assertTrue(expectedMin <= Long.parseLong(tsValue1.getValue()));
} else {
String pattern = "MMM dd, yyyy HH:mm a";
TbDate d = new TbDate(tsValue2.getValue(), pattern, "en-US");
Assert.assertNotNull(d);
}
}
protected DeviceProfile createLwm2mDeviceProfile(String name, Lwm2mDeviceProfileTransportConfiguration transportConfiguration) throws Exception {

View File

@ -27,9 +27,6 @@ import org.eclipse.leshan.core.response.WriteResponse;
import javax.security.auth.Destroyable;
import java.sql.Time;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -187,8 +184,7 @@ public class LwM2mBinaryAppDataContainer extends BaseInstanceEnabler implements
}
private Time getTimestamp() {
LocalTime localTime = LocalTime.ofInstant(Instant.now(), ZoneId.systemDefault());
this.timestamp = Time.valueOf(localTime);
setTimestamp();
return this.timestamp;
}

View File

@ -119,7 +119,7 @@ public abstract class AbstractSecurityLwM2MIntegrationTest extends AbstractLwM2M
protected final PrivateKey clientPrivateKeyFromCertTrust; // client private key used for X509 and RPK
protected final X509Certificate clientX509CertTrustNo; // client certificate signed by intermediate, rootCA with a good CN ("host name")
protected final PrivateKey clientPrivateKeyFromCertTrustNo; // client private key used for X509 and RPK
private final String[] RESOURCES_SECURITY = new String[]{"1.xml", "2.xml", "3.xml", "5.xml", "9.xml"};
private final String[] RESOURCES_SECURITY = new String[]{"1.xml", "2.xml", "3.xml", "5.xml", "9.xml", "19.xml"};
private final LwM2MBootstrapClientCredentials defaultBootstrapCredentials;

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.transport.lwm2m.security.sql;
import org.junit.Test;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.transport.lwm2m.security.AbstractSecurityLwM2MIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_BY_OBJECT;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.LwM2MClientState.ON_REGISTRATION_SUCCESS;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.LwM2MProfileBootstrapConfigType.BOOTSTRAP_ONLY;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.LwM2MProfileBootstrapConfigType.BOTH;
@ -32,13 +34,31 @@ public class NoSecLwM2MIntegrationTest extends AbstractSecurityLwM2MIntegrationT
public void testWithNoSecConnectLwm2mSuccessAndObserveTelemetry() throws Exception {
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC;
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
super.basicTestConnectionObserveTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, false);
super.basicTestConnectionObserveSingleTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, false);
}
@Test
public void testWithNoSecQueueModeConnectLwm2mSuccessAndObserveTelemetry() throws Exception {
public void testWithNoSecQueueModeConnectLwm2mSuccessAndObserveSingleTelemetry() throws Exception {
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC + "_QueueMode";
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
super.basicTestConnectionObserveTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, true);
super.basicTestConnectionObserveSingleTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, true);
}
@Test
public void testWithNoSecQueueModeConnectLwm2mSuccessAndObserveCompositeAllTelemetry() throws Exception {
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC + "_QueueMode";
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = super.getTransportConfiguration(TELEMETRY_WITH_COMPOSITE_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
super.basicTestConnectionObserveCompositeTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, transportConfiguration, 1);
}
@Test
public void testWithNoSecQueueModeConnectLwm2mSuccessAndObserveCompositeByObjectTelemetry() throws Exception {
String clientEndpoint = CLIENT_ENDPOINT_NO_SEC + "_QueueMode";
LwM2MDeviceCredentials clientCredentials = getDeviceCredentialsNoSec(createNoSecClientCredentials(clientEndpoint));
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = super.getTransportConfiguration(TELEMETRY_WITH_COMPOSITE_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
transportConfiguration.getObserveAttr().setObserveStrategy(COMPOSITE_BY_OBJECT);
super.basicTestConnectionObserveCompositeTelemetry(SECURITY_NO_SEC, clientCredentials, clientEndpoint, transportConfiguration, 2);
}
// Bootstrap + Lwm2m

View File

@ -35,7 +35,7 @@ public class TransportConfigurationTest extends AbstractSecurityLwM2MIntegration
@Test
public void testTransportConfigurationObserveStrategyBeforeParseNotNullAfterParseNotNull_STRATEGY_COMPOSITE_ALL() throws Exception {
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = getTransportConfiguration(TELEMETRY_WITH_STRATEGY_COMPOSITE_ALL, getBootstrapServerCredentialsNoSec(NONE));
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = getTransportConfiguration(TELEMETRY_WITH_COMPOSITE_OBSERVE, getBootstrapServerCredentialsNoSec(NONE));
Assert.assertNotNull(transportConfiguration.getObserveAttr().getObserveStrategy());
Assert.assertEquals(COMPOSITE_ALL, transportConfiguration.getObserveAttr().getObserveStrategy());
}

View File

@ -60,6 +60,7 @@ import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTrans
import org.thingsboard.server.common.data.device.profile.lwm2m.ObjectAttributes;
import org.thingsboard.server.common.data.device.profile.lwm2m.OtherConfiguration;
import org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryMappingConfiguration;
import org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
@ -92,6 +93,8 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadCallbac
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest;
import org.thingsboard.server.transport.lwm2m.server.downlink.composite.TbLwM2MObserveCompositeCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.composite.TbLwM2MObserveCompositeRequest;
import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfigService;
@ -116,9 +119,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_ALL;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.COMPOSITE_BY_OBJECT;
import static org.thingsboard.server.common.data.device.profile.lwm2m.TelemetryObserveStrategy.SINGLE;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
import static org.thingsboard.server.common.data.util.CollectionsUtil.diffSets;
import static org.thingsboard.server.transport.lwm2m.server.ota.DefaultLwM2MOtaUpdateService.FW_3_VER_ID;
@ -475,7 +480,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
Set<String> supportedObjects = clientContext.getSupportedIdVerInClient(lwM2MClient);
if (supportedObjects != null && supportedObjects.size() > 0) {
this.sendReadRequests(lwM2MClient, profile, supportedObjects);
this.sendObserveRequests(lwM2MClient, profile, supportedObjects);
this.sendInitObserveRequests(lwM2MClient, profile, supportedObjects);
this.sendWriteAttributeRequests(lwM2MClient, profile, supportedObjects);
// Removed. Used only for debug.
// this.sendDiscoverRequests(lwM2MClient, profile, supportedObjects);
@ -501,16 +506,27 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
}
private void sendObserveRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {
private void sendInitObserveRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {
try {
Set<String> targetIds = profile.getObserveAttr().getObserve();
targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());
CountDownLatch latch = new CountDownLatch(targetIds.size());
targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId,
new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId))));
latch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
if (!targetIds.isEmpty()) {
TelemetryObserveStrategy observeStrategy = profile.getObserveAttr().getObserveStrategy();
if (SINGLE.equals(observeStrategy)) {
CountDownLatch latch = new CountDownLatch(targetIds.size());
targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId,
new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId))));
latch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
} else if (COMPOSITE_ALL.equals(observeStrategy)) {
String[] versionedIds = targetIds.toArray(new String[0]);
sendObserveCompositeRequest(lwM2MClient, versionedIds);
} else if (COMPOSITE_BY_OBJECT.equals(observeStrategy)) {
Map<Integer, String[]> versionedObjectIds = groupByObjectIdVersionedIds(targetIds);
CountDownLatch latch = new CountDownLatch(versionedObjectIds.size());
versionedObjectIds.forEach((k, v)-> sendObserveCompositeRequest(lwM2MClient, v));
latch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
}
}
} catch (InterruptedException e) {
log.error("[{}] Failed to await Observe requests!", lwM2MClient.getEndpoint(), e);
} catch (Exception e) {
@ -547,6 +563,12 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(versionedId).timeout(clientContext.getRequestTimeout(lwM2MClient)).build();
defaultLwM2MDownlinkMsgHandler.sendObserveRequest(lwM2MClient, request, callback);
}
private void sendObserveCompositeRequest(LwM2mClient lwM2MClient, String[] versionedIds) {
TbLwM2MObserveCompositeRequest request = TbLwM2MObserveCompositeRequest.builder().versionedIds(versionedIds).timeout(clientContext.getRequestTimeout(lwM2MClient)).build();
var mainCallback = new TbLwM2MObserveCompositeCallback(this, logService, lwM2MClient, versionedIds);
defaultLwM2MDownlinkMsgHandler.sendObserveCompositeRequest(lwM2MClient, request, mainCallback);
}
private void sendWriteAttributesRequest(LwM2mClient lwM2MClient, String targetId, ObjectAttributes params) {
TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(targetId).attributes(params).timeout(clientContext.getRequestTimeout(lwM2MClient)).build();
@ -1024,4 +1046,14 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
}
private Map<Integer, String[]> groupByObjectIdVersionedIds(Set<String> targetIds){
return targetIds.stream()
.collect(Collectors.groupingBy(
id -> new LwM2mPath(fromVersionedIdToObjectId(id)).getObjectId(),
Collectors.collectingAndThen(
Collectors.toList(),
list -> list.toArray(new String[0])
)
));
}
}