tbel: fix bug send From Client CollectedValue

This commit is contained in:
nick 2024-08-28 15:20:55 +03:00
parent 02d3382731
commit f598e2fe89
6 changed files with 174 additions and 31 deletions

View File

@ -189,7 +189,9 @@ public class LwM2MTestClient {
locationParams = new LwM2MLocationParams();
locationParams.getPos();
initializer.setInstancesForObject(LOCATION, new LwM2mLocation(locationParams.getLatitude(), locationParams.getLongitude(), locationParams.getScaleFactor(), executor, OBJECT_INSTANCE_ID_0));
initializer.setInstancesForObject(TEMPERATURE_SENSOR, lwM2MTemperatureSensor = new LwM2mTemperatureSensor(executor, OBJECT_INSTANCE_ID_0), new LwM2mTemperatureSensor(executor, OBJECT_INSTANCE_ID_12));
LwM2mTemperatureSensor lwM2mTemperatureSensor0 = new LwM2mTemperatureSensor(executor, OBJECT_INSTANCE_ID_0);
LwM2mTemperatureSensor lwM2mTemperatureSensor12 = new LwM2mTemperatureSensor(executor, OBJECT_INSTANCE_ID_12);
initializer.setInstancesForObject(TEMPERATURE_SENSOR, lwM2mTemperatureSensor0, lwM2mTemperatureSensor12);
List<LwM2mObjectEnabler> enablers = initializer.createAll();
@ -314,6 +316,7 @@ public class LwM2MTestClient {
clientDtlsCid = new HashMap<>();
clientStates.add(ON_INIT);
leshanClient = builder.build();
lwM2mTemperatureSensor12.setLeshanClient(leshanClient);
LwM2mClientObserver observer = new LwM2mClientObserver() {
@Override

View File

@ -16,9 +16,13 @@
package org.thingsboard.server.transport.lwm2m.client;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.client.LeshanClient;
import org.eclipse.leshan.client.resource.BaseInstanceEnabler;
import org.eclipse.leshan.client.send.ManualDataSender;
import org.eclipse.leshan.client.servers.LwM2mServer;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.argument.Arguments;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
@ -26,6 +30,7 @@ import org.eclipse.leshan.core.response.ReadResponse;
import javax.security.auth.Destroyable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -39,6 +44,9 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
private double currentTemp = 20d;
private double minMeasuredValue = currentTemp;
private double maxMeasuredValue = currentTemp;
private LeshanClient leshanClient;
private List<Double> containingValues;
protected static final Random RANDOM = new Random();
private static final List<Integer> supportedResources = Arrays.asList(5601, 5602, 5700, 5701);
@ -65,7 +73,17 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
case 5602:
return ReadResponse.success(resourceId, getTwoDigitValue(maxMeasuredValue));
case 5700:
return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp));
if (identity == LwM2mServer.SYSTEM) {
setTemperature();
setData();
return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp));
} else if (this.getId() == 12 && this.leshanClient != null) {
containingValues = new ArrayList<>();
sendCollected(5700);
return ReadResponse.success(resourceId, getData());
} else {
return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp));
}
case 5701:
return ReadResponse.success(resourceId, UNIT_CELSIUS);
default:
@ -91,8 +109,7 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
}
private void adjustTemperature() {
float delta = (RANDOM.nextInt(20) - 10) / 10f;
currentTemp += delta;
setTemperature();
Integer changedResource = adjustMinMaxMeasuredValue(currentTemp);
fireResourceChange(5700);
if (changedResource != null) {
@ -100,6 +117,10 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
}
}
private void setTemperature(){
float delta = (RANDOM.nextInt(20) - 10) / 10f;
currentTemp += delta;
}
private synchronized Integer adjustMinMaxMeasuredValue(double newTemperature) {
if (newTemperature > maxMeasuredValue) {
maxMeasuredValue = newTemperature;
@ -122,9 +143,48 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
return supportedResources;
}
protected void setLeshanClient(LeshanClient leshanClient){
this.leshanClient = leshanClient;
}
@Override
public void destroy() {
}
private void sendCollected(int resourceId) {
try {
LwM2mServer registeredServer = this.leshanClient.getRegisteredServers().values().iterator().next();
ManualDataSender sender = this.leshanClient.getSendService().getDataSender(ManualDataSender.DEFAULT_NAME,
ManualDataSender.class);
sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId)));
Thread.sleep(1000);
sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId)));
sender.sendCollectedData(registeredServer, ContentFormat.SENML_JSON, 1000, false);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private LwM2mPath getPathForCollectedValue(int resourceId) {
return new LwM2mPath(3303, this.getId(), resourceId);
}
private double getData() {
if (containingValues.size() > 1) {
Integer t0 = Math.toIntExact(Math.round(containingValues.get(0) * 100));
Integer t1 = Math.toIntExact(Math.round(containingValues.get(1) * 100));
long to_t1 = (((long) t0) << 32) | (t1 & 0xffffffffL);
return Double.longBitsToDouble(to_t1);
} else {
return currentTemp;
}
}
private void setData() {
if (containingValues == null){
containingValues = new ArrayList<>();
}
containingValues.add(getTwoDigitValue(currentTemp));
}
}

View File

@ -16,19 +16,34 @@
package org.thingsboard.server.transport.lwm2m.rpc.sql;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.server.registration.Registration;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationTest;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import java.time.Instant;
import java.util.Map;
import static org.eclipse.leshan.core.LwM2mId.SERVER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.BINARY_APP_DATA_CONTAINER;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_12;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_11;
@ -41,9 +56,13 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_14;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9;
@Slf4j
public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest {
@SpyBean
DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest;
/**
* Read {"id":"/3"}
* Read {"id":"/6"}...
@ -57,7 +76,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
String expectedObjectId = pathIdVerToObjectId((String) expected);
LwM2mPath expectedPath = new LwM2mPath(expectedObjectId);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
String expectedObjectInstances = "LwM2mObject [id=" + expectedPath.getObjectId() + ", instances={0=LwM2mObjectInstance [id=0, resources=";
if (expectedPath.getObjectId() == 1) {
expectedObjectInstances = "LwM2mObject [id=1, instances={1=";
@ -211,6 +230,60 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
assertTrue(actualValues.contains(expected19_1_0));
}
/**
* /3303/0/5700
* Read {"id":"/3303/0/5700"}
* Trigger a Send operation from the client with multiple values for the same resource as a payload
* acked "[{"bn":"/3303/12/5700","bt":1724".. 116 bytes]
* 2 values for the resource /3303/12/5700 should be stored with timestamps1 = Instance.now(), timestamps2 = Instance.now()
*
* @throws Exception
*/
@Test
public void testReadSingleResource_sendFromClient_CollectedValue() throws Exception {
TimestampedLwM2mNodes[] tsNodesHolder = new TimestampedLwM2mNodes[1];
doAnswer(inv -> {
tsNodesHolder[0] = inv.getArgument(1);
return null;
}).when(defaultUplinkMsgHandlerTest).onUpdateValueWithSendRequest(
Mockito.any(Registration.class),
Mockito.any(TimestampedLwM2mNodes.class)
);
int resourceId = 5700;
String expectedIdVer = objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + resourceId;
String actualResult = sendRPCById(expectedIdVer);
verify(defaultUplinkMsgHandlerTest, timeout(10000).times(1))
.onUpdateValueWithSendRequest(Mockito.any(Registration.class), Mockito.any(TimestampedLwM2mNodes.class));
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
String expected = "LwM2mSingleResource [id=" + resourceId + ", value=";
String actual = rpcActualResult.get("value").asText();
assertTrue(actual.contains(expected));
int indStart = actual.indexOf(expected) + expected.length();
int indEnd = actual.indexOf(",", indStart);
String valStr = actual.substring(indStart, indEnd);
double dd = Double.parseDouble(valStr);
long combined = Double.doubleToRawLongBits(dd);
int t0 = (int) (combined >> 32);
int t1 = (int) combined;
double[] expectedValues ={(double)t0/100, (double)t1/100};
int ind = 0;
LwM2mPath expectedPath = new LwM2mPath("/3303/12/5700");
for (Instant ts : tsNodesHolder[0].getTimestamps()) {
Map<LwM2mPath, LwM2mNode> nodesAt = tsNodesHolder[0].getNodesAt(ts);
for (var instant : nodesAt.entrySet()) {
LwM2mPath actualPath = instant.getKey();
LwM2mNode node = instant.getValue();
LwM2mResource lwM2mResource = (LwM2mResource) node;
assertEquals(expectedPath, actualPath);
assertEquals(expectedValues[ind], lwM2mResource.getValue());
ind++;
}
}
}
/**
* ReadComposite {"keys":["batteryLevel", "UtfOffset", "dataDescription"]}
*/

View File

@ -137,8 +137,9 @@ public class LwM2mServerListener {
@Override
public void dataReceived(Registration registration, TimestampedLwM2mNodes data, SendRequest request) {
log.trace("Received Send request from [{}] containing value: [{}], coapRequest: [{}]", registration.getEndpoint(), data.toString(), request.getCoapRequest().toString());
if (registration != null) {
service.onUpdateValueWithSendRequest(registration, request);
service.onUpdateValueWithSendRequest(registration, data);
}
}

View File

@ -36,6 +36,7 @@ import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.LwM2mResourceInstance;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.LwM2mValueConverter;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.CreateRequest;
@ -102,6 +103,7 @@ import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mSecurityStore;
import org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil;
import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -145,7 +147,7 @@ import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fr
public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService implements LwM2mUplinkMsgHandler {
@Getter
private final LwM2mValueConverter converter = LwM2mValueConverterImpl.getInstance();;
private final LwM2mValueConverter converter = LwM2mValueConverterImpl.getInstance();
private final TransportService transportService;
private final LwM2mTransportContext context;
@ -360,29 +362,33 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
* Sending updated value to thingsboard from SendListener.dataReceived: object, instance, SingleResource or MultipleResource
*
* @param registration - Registration LwM2M Client
* @param sendRequest - sendRequest
* @param data - TimestampedLwM2mNodes (send From Client CollectedValue)
*/
@Override
public void onUpdateValueWithSendRequest(Registration registration, SendRequest sendRequest) {
for(var entry : sendRequest.getTimestampedNodes().getNodes().entrySet()) {
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
String stringPath = convertObjectIdToVersionedId(path.toString(), lwM2MClient);
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(stringPath, modelProvider);
if (objectModelVersion != null) {
if (node instanceof LwM2mObject) {
LwM2mObject lwM2mObject = (LwM2mObject) node;
this.updateObjectResourceValue(lwM2MClient, lwM2mObject, stringPath, 0);
} else if (node instanceof LwM2mObjectInstance) {
LwM2mObjectInstance lwM2mObjectInstance = (LwM2mObjectInstance) node;
this.updateObjectInstanceResourceValue(lwM2MClient, lwM2mObjectInstance, stringPath, 0);
} else if (node instanceof LwM2mResource) {
LwM2mResource lwM2mResource = (LwM2mResource) node;
this.updateResourcesValue(lwM2MClient, lwM2mResource, stringPath, Mode.UPDATE, 0);
public void onUpdateValueWithSendRequest(Registration registration, TimestampedLwM2mNodes data) {
for (Instant ts : data.getTimestamps()) {
Map<LwM2mPath, LwM2mNode> nodesAt = data.getNodesAt(ts);
for (var instant : nodesAt.entrySet()) {
LwM2mPath path = instant.getKey();
LwM2mNode node = instant.getValue();
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
String stringPath = convertObjectIdToVersionedId(path.toString(), lwM2MClient);
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(stringPath, modelProvider);
if (objectModelVersion != null) {
if (node instanceof LwM2mObject) {
LwM2mObject lwM2mObject = (LwM2mObject) node;
this.updateObjectResourceValue(lwM2MClient, lwM2mObject, stringPath, 0);
} else if (node instanceof LwM2mObjectInstance) {
LwM2mObjectInstance lwM2mObjectInstance = (LwM2mObjectInstance) node;
this.updateObjectInstanceResourceValue(lwM2MClient, lwM2mObjectInstance, stringPath, 0);
} else if (node instanceof LwM2mResource) {
LwM2mResource lwM2mResource = (LwM2mResource) node;
this.updateResourcesValue(lwM2MClient, lwM2mResource, stringPath, Mode.UPDATE, 0);
}
}
tryAwake(lwM2MClient);
}
tryAwake(lwM2MClient);
}
}
@ -969,9 +975,9 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
}
public LwM2mClientContext getClientContext(){
public LwM2mClientContext getClientContext() {
return this.clientContext;
};
}
private Map<String, String> getNamesFromProfileForSharedAttributes(LwM2mClient lwM2MClient) {
Lwm2mDeviceProfileTransportConfiguration profile = clientContext.getProfile(lwM2MClient.getProfileId());

View File

@ -15,10 +15,10 @@
*/
package org.thingsboard.server.transport.lwm2m.server.uplink;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.LwM2mValueConverter;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.CreateRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.request.WriteCompositeRequest;
import org.eclipse.leshan.core.request.WriteRequest;
import org.eclipse.leshan.core.response.ReadCompositeResponse;
@ -50,7 +50,7 @@ public interface LwM2mUplinkMsgHandler {
void onUpdateValueAfterReadCompositeResponse(Registration registration, ReadCompositeResponse response);
void onErrorObservation(Registration registration, String errorMsg);
void onUpdateValueWithSendRequest(Registration registration, SendRequest sendRequest);
void onUpdateValueWithSendRequest(Registration registration, TimestampedLwM2mNodes data);
void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile);