lwm2m: add exception to ObserveCompositeCancel

This commit is contained in:
nick 2024-01-17 18:28:21 +02:00
parent 4fe63cee70
commit 3d495572c5
5 changed files with 195 additions and 262 deletions

View File

@ -188,8 +188,8 @@ public class ObservationServiceImpl implements ObservationService, LwM2mNotifica
result.add(obs); result.add(obs);
} else if (!lwPath.equals(lwPathObs) && lwPath.startWith(lwPathObs)) { // nodePath = "3/0/9", lwPathObs = "3": error... } else if (!lwPath.equals(lwPathObs) && lwPath.startWith(lwPathObs)) { // nodePath = "3/0/9", lwPathObs = "3": error...
String errorMsg = String.format( String errorMsg = String.format(
"Unexpected error: There is registration with id %s for observation path %s, that includes this observation path %s", "Unexpected error <cancelObservation>: There is registration with id [%s] existing observation [%s] includes input observation [%s]!",
registrationId, lwPath, lwPathObs); registrationId, lwPathObs, lwPath);
throw new IllegalStateException(errorMsg); throw new IllegalStateException(errorMsg);
} }
} }

View File

@ -230,22 +230,22 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
/** /**
* Observe {"id":["3"]} - Ok * Observe {"id":["3"]} - Ok
* PreviousObservation contains "3/0/0" * PreviousObservation contains "3/0/9"
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void testObserve_Result_CONTENT_ONE_PATH_PreviousObservation_CONTAINCE_OTHER_CurrentObservation() throws Exception { public void testObserve_Result_CONTENT_ONE_PATH_PreviousObservation_CONTAINCE_OTHER_CurrentObservation() throws Exception {
sendCancelObserveAllWithAwait(deviceId); sendCancelObserveAllWithAwait(deviceId);
// "3/0/9" // "3/0/9"
String idVer_3_0_0 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_0; String idVer_3_0_9 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_9;
String actualResult3_0_0 = sendRpcObserve("Observe", idVer_3_0_0); String actualResult3_0_9 = sendRpcObserve("Observe", idVer_3_0_9);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult3_0_0, ObjectNode.class); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult3_0_9, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
// "3" // "3"
String actualResult3 = sendRpcObserve("Observe", objectIdVer_3); String actualResult3 = sendRpcObserve("Observe", objectIdVer_3);
rpcActualResult = JacksonUtil.fromString(actualResult3, ObjectNode.class); rpcActualResult = JacksonUtil.fromString(actualResult3, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
// PreviousObservation "3/0/0" change to CurrentObservation "3" // PreviousObservation "3/0/9" change to CurrentObservation "3"
String actualResultReadAll = sendRpcObserve("ObserveReadAll", null); String actualResultReadAll = sendRpcObserve("ObserveReadAll", null);
rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class); rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
@ -263,16 +263,18 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
@Test @Test
public void testObserve_Result_CONTENT_ONE_PATH_CurrentObservation_CONTAINCE_OTHER_PreviousObservation() throws Exception { public void testObserve_Result_CONTENT_ONE_PATH_CurrentObservation_CONTAINCE_OTHER_PreviousObservation() throws Exception {
sendCancelObserveAllWithAwait(deviceId); sendCancelObserveAllWithAwait(deviceId);
// "3" // "3"
String actualResult3 = sendRpcObserve("Observe", objectIdVer_3); String actualResult3 = sendRpcObserve("Observe", objectIdVer_3);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult3, ObjectNode.class); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult3, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
// "3/0/0"
// "3/0/0"; WARN: - Token collision ? existing observation [/3] includes input observation [/3/0/0]
String idVer_3_0_0 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_0; String idVer_3_0_0 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_0;
String actualResult3_0_0 = sendRpcObserve("Observe", idVer_3_0_0); String actualResult3_0_0 = sendRpcObserve("Observe", idVer_3_0_0);
rpcActualResult = JacksonUtil.fromString(actualResult3_0_0, ObjectNode.class); rpcActualResult = JacksonUtil.fromString(actualResult3_0_0, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
// PreviousObservation "3" contains CurrentObservation "3/0/0"
String actualResultReadAll = sendRpcObserve("ObserveReadAll", null); String actualResultReadAll = sendRpcObserve("ObserveReadAll", null);
rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class); rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
@ -331,7 +333,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
// cancel observe "/3_1.2/0/9" // cancel observe "/3_1.2/0/9"
String expectedId_3_0_9 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_9; String expectedId_3_0_9 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_9;
String actualResult = sendRpcObserve("ObserveCancel", expectedId_3_0_9); String actualResult = sendRpcObserve("ObserveCancel", expectedId_3_0_9);
String expectedValue = "for observation path " + fromVersionedIdToObjectId(expectedId_3_0_9) + ", that includes this observation path " + fromVersionedIdToObjectId(objectIdVer_3); String expectedValue = "existing observation [" + fromVersionedIdToObjectId(objectIdVer_3) + "] includes input observation [" + fromVersionedIdToObjectId(expectedId_3_0_9);
rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.INTERNAL_SERVER_ERROR.getName(), rpcActualResult.get("result").asText()); assertEquals(ResponseCode.INTERNAL_SERVER_ERROR.getName(), rpcActualResult.get("result").asText());
assertTrue(rpcActualResult.get("error").asText().contains(expectedValue)); assertTrue(rpcActualResult.get("error").asText().contains(expectedValue));

View File

@ -59,6 +59,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -258,7 +259,7 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable
if (observation instanceof SingleObservation) { if (observation instanceof SingleObservation) {
if (validateObserveResource(((SingleObservation)observation).getPath(), registrationId)) { if (validateObserveResource(((SingleObservation)observation).getPath(), registrationId)) {
updateSingleObservation(registrationId, observation, addIfAbsent, removed); updateSingleObservation(registrationId, (SingleObservation) observation, addIfAbsent, removed);
// cancel existing observations for the same path and registration id. // cancel existing observations for the same path and registration id.
cancelObservation (observation, registrationId, removed); cancelObservation (observation, registrationId, removed);
} }
@ -270,8 +271,8 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable
((CompositeObservation)observation).getPaths().forEach(path -> { ((CompositeObservation)observation).getPaths().forEach(path -> {
if (validateObserveResource(path, registrationId)) { if (validateObserveResource(path, registrationId)) {
String serializedObs = createSerializedSingleObservation(nodeSerObs, path.toString()); String serializedObs = createSerializedSingleObservation(nodeSerObs, path.toString());
Observation singleObservation = createSingleObservation(registrationId, path, ct, ctx, serializedObs); Observation singleObservation = createSingleObservation(registrationId, path, ct, ctx, serializedObs, tokenGenerator);
updateSingleObservation(registrationId, singleObservation, addIfAbsent, removed); updateSingleObservation(registrationId, (SingleObservation) singleObservation, addIfAbsent, removed);
// cancel existing observations for the same path and registration id. // cancel existing observations for the same path and registration id.
cancelObservation (singleObservation, registrationId, removed); cancelObservation (singleObservation, registrationId, removed);
} }
@ -301,35 +302,65 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable
return true; return true;
} }
private void updateSingleObservation (String registrationId, Observation observation, boolean addIfAbsent, List<Observation> removed) { private void updateSingleObservation (String registrationId, SingleObservation observation, boolean addIfAbsent, List<Observation> removed) {
Observation previousObservation; // Absorption by existing Observations
Observation previousObservation = null;
SingleObservation existingObservation = null;
ObservationIdentifier id = observation.getId(); ObservationIdentifier id = observation.getId();
if (addIfAbsent) { if (addIfAbsent) {
if (!obsByToken.containsKey(id)) if (!obsByToken.containsKey(id)) {
previousObservation = obsByToken.put(id, observation); existingObservation = validateByAbsorptionExistingObservations(observation);
else if (existingObservation == null) {
previousObservation = obsByToken.get(id); obsByToken.put(id, observation);
} else if (!existingObservation.getPath().equals(observation.getPath())){
obsByToken.put(id, observation);
previousObservation = obsByToken.get(existingObservation.getId());
}
}
} else { } else {
previousObservation = obsByToken.put(id, observation); previousObservation = obsByToken.put(id, observation);
} }
if (!tokensByRegId.containsKey(registrationId)) { if (!tokensByRegId.containsKey(registrationId)) {
tokensByRegId.put(registrationId, new HashSet<ObservationIdentifier>()); tokensByRegId.put(registrationId, new HashSet<ObservationIdentifier>());
} }
if (existingObservation == null || !existingObservation.getPath().equals(observation.getPath())) {
tokensByRegId.get(registrationId).add(id); tokensByRegId.get(registrationId).add(id);
}
// log any collisions // log any collisions
if (previousObservation != null) { if (addIfAbsent && previousObservation != null) {
if (!existingObservation.getPath().equals(observation.getPath())) {
removed.add(previousObservation); removed.add(previousObservation);
LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}] ", LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}], that this observation includes input observation [{}]!",
previousObservation, observation); previousObservation, observation, observation);
} else {
LOG.warn("Token collision ? existing observation [{}] includes input observation [{}]",
existingObservation, observation);
}
} }
} }
private Observation createSingleObservation(String registrationId, LwM2mPath target, ContentFormat ct, private SingleObservation validateByAbsorptionExistingObservations (SingleObservation observation) {
Map<String, String> ctx, String serializedObservation) { LwM2mPath pathObservation = observation.getPath();
AtomicReference<SingleObservation> result = new AtomicReference<>();
obsByToken.values().stream().forEach(obs -> {
LwM2mPath pathObs = ((SingleObservation)obs).getPath();
if ((!pathObservation.equals(pathObs) && pathObs.startWith(pathObservation)) || // pathObs = "3/0/9"-> pathObservation = "3"
(pathObservation.equals(pathObs) && !observation.getId().equals(obs.getId()))) {
result.set((SingleObservation)obs);
} else if (!pathObservation.equals(pathObs) && pathObservation.startWith(pathObs)) { // pathObs = "3" -> pathObservation = "3/0/9"
result.set(observation);
}
});
return result.get();
}
public static SingleObservation createSingleObservation(String registrationId, LwM2mPath target, ContentFormat ct,
Map<String, String> ctx, String serializedObservation, TokenGenerator tokenGenerator) {
Token token = tokenGenerator.createToken(Scope.SHORT_TERM); Token token = tokenGenerator.createToken(Scope.SHORT_TERM);
Map<String, String> protocolData = Collections.emptyMap(); Map<String, String> protocolData = Collections.emptyMap();
if (serializedObservation != null) { if (serializedObservation != null) {
@ -339,7 +370,7 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable
return new SingleObservation(new ObservationIdentifier(token.getBytes()), registrationId, target, ct, ctx, protocolData); return new SingleObservation(new ObservationIdentifier(token.getBytes()), registrationId, target, ct, ctx, protocolData);
} }
private String createSerializedSingleObservation(JsonNode nodeSerObs, String path){ public static String createSerializedSingleObservation(JsonNode nodeSerObs, String path){
if (nodeSerObs.has("context")){ if (nodeSerObs.has("context")){
((ObjectNode) nodeSerObs.get("context")).put("leshan-path", path + "\n"); ((ObjectNode) nodeSerObs.get("context")).put("leshan-path", path + "\n");
return JacksonUtil.toString(nodeSerObs); return JacksonUtil.toString(nodeSerObs);

View File

@ -15,19 +15,24 @@
*/ */
package org.thingsboard.server.transport.lwm2m.server.store; package org.thingsboard.server.transport.lwm2m.server.store;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.Token; import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.network.TokenGenerator;
import org.eclipse.californium.core.network.serialization.UdpDataParser; import org.eclipse.californium.core.network.serialization.UdpDataParser;
import org.eclipse.californium.core.network.serialization.UdpDataSerializer; import org.eclipse.californium.core.network.serialization.UdpDataSerializer;
import org.eclipse.leshan.core.Destroyable; import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable; import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable; import org.eclipse.leshan.core.Stoppable;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mPath; import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.observation.CompositeObservation; import org.eclipse.leshan.core.observation.CompositeObservation;
import org.eclipse.leshan.core.observation.Observation; import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.ObservationIdentifier; import org.eclipse.leshan.core.observation.ObservationIdentifier;
import org.eclipse.leshan.core.observation.SingleObservation; import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.peer.LwM2mIdentity; import org.eclipse.leshan.core.peer.LwM2mIdentity;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.util.NamedThreadFactory; import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.core.util.Validate; import org.eclipse.leshan.core.util.Validate;
import org.eclipse.leshan.server.redis.RedisRegistrationStore; import org.eclipse.leshan.server.redis.RedisRegistrationStore;
@ -47,6 +52,8 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.ScanOptions;
import org.springframework.integration.redis.util.RedisLockRegistry; import org.springframework.integration.redis.util.RedisLockRegistry;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -56,14 +63,19 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.eclipse.leshan.core.californium.ObserveUtil.extractSerializedObservation;
import static org.thingsboard.server.transport.lwm2m.server.store.TbInMemoryRegistrationStore.createSerializedSingleObservation;
import static org.thingsboard.server.transport.lwm2m.server.store.TbInMemoryRegistrationStore.createSingleObservation;
@Slf4j @Slf4j
public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startable, Stoppable, Destroyable { public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startable, Stoppable, Destroyable {
@ -106,24 +118,30 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
private final RedisLockRegistry redisLock; private final RedisLockRegistry redisLock;
public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory) { private final TokenGenerator tokenGenerator;
this(connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s
private final LwM2mVersionedModelProvider modelProvider;
public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator,RedisConnectionFactory connectionFactory, LwM2mVersionedModelProvider modelProvider) {
this(tokenGenerator, connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT, modelProvider); // default clean period 60s
} }
public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) { public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator, RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) {
this(connectionFactory, Executors.newScheduledThreadPool(1, this(tokenGenerator, connectionFactory, Executors.newScheduledThreadPool(1,
new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))),
cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit); cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider);
} }
public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, public TbLwM2mRedisRegistrationStore(TokenGenerator tokenGenerator,RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec,
long lifetimeGracePeriodInSec, int cleanLimit) { long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) {
this.connectionFactory = connectionFactory; this.connectionFactory = connectionFactory;
this.schedExecutor = schedExecutor; this.schedExecutor = schedExecutor;
this.cleanPeriod = cleanPeriodInSec; this.cleanPeriod = cleanPeriodInSec;
this.cleanLimit = cleanLimit; this.cleanLimit = cleanLimit;
this.gracePeriod = lifetimeGracePeriodInSec; this.gracePeriod = lifetimeGracePeriodInSec;
this.redisLock = new RedisLockRegistry(connectionFactory, "Registration"); this.redisLock = new RedisLockRegistry(connectionFactory, "Registration");
this.tokenGenerator = tokenGenerator;
this.modelProvider = modelProvider;
} }
/* *************** Redis Key utility function **************** */ /* *************** Redis Key utility function **************** */
@ -464,7 +482,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
try (var connection = connectionFactory.getConnection()) { try (var connection = connectionFactory.getConnection()) {
// fetch the client ep by registration ID index // fetch the client ep by registration ID index
byte[] ep = connection.get(toRegIdKey(registrationId)); byte[] ep = connection.commands().get(toRegIdKey(registrationId));
if (ep == null) { if (ep == null) {
throw new IllegalStateException(String.format( throw new IllegalStateException(String.format(
"can not add observation %s there is no registration with id %s", observation, registrationId)); "can not add observation %s there is no registration with id %s", observation, registrationId));
@ -476,103 +494,26 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
try { try {
lock = redisLock.obtain(lockKey); lock = redisLock.obtain(lockKey);
lock.lock(); lock.lock();
// Add and Get previous observation
byte[] previousValue = null;
// byte[] key;
// byte[] serializeObs;
byte[] key = toKey(OBS_TKN, observation.getId().getBytes());
byte[] serializeObs = serializeObs(observation);
// we analyze the present previous value
Collection<LwM2mPath> previousPaths = getPreviousPaths(observation, connection);
if (observation instanceof SingleObservation) { if (observation instanceof SingleObservation) {
if (previousPaths == null) { if (validateObserveResource(((SingleObservation)observation).getPath(), registrationId)) {
connection.stringCommands().set(key, serializeObs); updateSingleObservation(registrationId, (SingleObservation)observation, addIfAbsent, removed, connection);
} else if (!addIfAbsent) {
}
previousValue = connection.get(key);
if (previousValue == null || previousValue.length == 0) {
if (observation instanceof CompositeObservation) {
List<LwM2mPath> paths = ((CompositeObservation) observation).getPaths();
if (paths.size()==1) {
key = toKey(OBS_TKN, observation.getId().getBytes());
serializeObs = serializeObs(observation);
previousValue = connection.stringCommands().get(key);
if (previousValue == null || previousValue.length == 0) {
// connection.set(key, serializeObs);
connection.stringCommands().set(key, serializeObs);
}
paths.forEach(pObs -> {
});
}
else {
paths.forEach(pObs -> {
});
}
} else if (observation instanceof SingleObservation) {
// previousValue = connection.get(key);
previousValue = connection.stringCommands().get(key);
if (previousValue == null || previousValue.length == 0) {
connection.stringCommands().set(key, serializeObs);
// connection.set(key, serializeObs);
}
}
}
} else {
// previousValue = connection.getSet(key, serializeObs);
previousValue = connection.stringCommands().getSet(key, serializeObs);
}
// secondary index to get the list by registrationId
connection.lPush(toKey(OBS_TKNS_REGID_IDX, registrationId), observation.getId().getBytes());
// log any collisions
Observation previousObservation;
if (previousValue != null && previousValue.length != 0) {
previousObservation = deserializeObs(previousValue);
LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}] ",
previousObservation, observation);
}
// cancel existing observations for the same path and registration id. // cancel existing observations for the same path and registration id.
for (Observation obs : getObservations(connection, registrationId)) { cancelObservation(observation, registrationId, removed, connection);
if (obs instanceof CompositeObservation){ }
((CompositeObservation)obs).getPaths().forEach(pObs -> { } else {
if (observation instanceof CompositeObservation) { ContentFormat ct = ((CompositeObservation) observation).getResponseContentFormat();
((CompositeObservation)observation).getPaths().forEach(pObservation -> { Map<String, String> ctx = observation.getContext();
//TODO include obs every (SingleObservation) in observation Composite String serializedObservation = extractSerializedObservation(observation);
JsonNode nodeSerObs = JacksonUtil.toJsonNode(serializedObservation);
((CompositeObservation)observation).getPaths().forEach(path -> {
if (validateObserveResource(path, registrationId)) {
String serializedObs = createSerializedSingleObservation(nodeSerObs, path.toString());
SingleObservation singleObservation = createSingleObservation(registrationId, path, ct, ctx, serializedObs, tokenGenerator);
updateSingleObservation(registrationId, singleObservation, addIfAbsent, removed, connection);
// cancel existing observations for the same path and registration id.
cancelObservation (singleObservation, registrationId, removed, connection);
}
}); });
log.info("observation obs -> CompositeObservation");
} else if (observation instanceof SingleObservation) {
if (((SingleObservation) observation).getPath().equals(((SingleObservation)obs).getPath())
&& !observation.getId().equals(obs.getId())){
removed.add(obs);
unsafeRemoveObservation(connection, registrationId, obs.getId().getBytes());
}
}
});
} else if (obs instanceof SingleObservation) {
if (observation instanceof CompositeObservation) {
((CompositeObservation)observation).getPaths().forEach(pObservation -> {
//TODO include obs every (SingleObservation) in observation Composite
});
log.info("observation -> CompositeObservation, obs -> SingleObservation");
} else if (observation instanceof SingleObservation) {
if (((SingleObservation) observation).getPath().equals(((SingleObservation)obs).getPath())
&& !observation.getId().equals(obs.getId())){
removed.add(obs);
unsafeRemoveObservation(connection, registrationId, obs.getId().getBytes());
}
}
}
} }
} finally { } finally {
if (lock != null) { if (lock != null) {
@ -583,6 +524,64 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
return removed; return removed;
} }
private boolean validateObserveResource(LwM2mPath path, String registrationId){
// check if the resource is readable.
if (path.isResource() || path.isResourceInstance()) {
ObjectModel objectModel = modelProvider.getObjectModel(getRegistration(registrationId)).getObjectModel(path.getObjectId());
ResourceModel resourceModel = objectModel == null ? null : objectModel.resources.get(path.getResourceId());
if (resourceModel == null) {
return false;
} else if (!resourceModel.operations.isReadable()) {
return false;
} else if (path.isResourceInstance() && !resourceModel.multiple) {
return false;
}
}
return true;
}
private void updateSingleObservation (String registrationId, SingleObservation observation, boolean addIfAbsent,
List<Observation> removed, RedisConnection connection) {
// Add and Get previous observation
byte[] previousValue;
byte[] key = toKey(OBS_TKN, observation.getId().getBytes());
byte[] serializeObs = serializeObs(observation);
// we analyze the present previous value
SingleObservation existingObservation = null;
if (addIfAbsent){
previousValue = connection.stringCommands().get(key);
if (previousValue == null) {
existingObservation = validateByAbsorptionExistingObservations(observation, connection);
if (existingObservation == null){
connection.stringCommands().set(key, serializeObs);
} else if(!existingObservation.getPath().equals(observation.getPath())) {
connection.stringCommands().set(key, serializeObs);
previousValue = serializeObs(existingObservation);
}
}
} else {
previousValue = connection.stringCommands().getSet(key, serializeObs);
}
// secondary index to get the list by registrationId
connection.listCommands().lPush(toKey(OBS_TKNS_REGID_IDX, registrationId), observation.getId().getBytes());
// log any collisions
if (addIfAbsent && previousValue != null) {
if (!existingObservation.getPath().equals(observation.getPath())) {
Observation previousObservation = deserializeObs(previousValue);
removed.add(previousObservation);
LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}], that this observation includes input observation [{}]!",
previousObservation, observation, observation);
} else {
LOG.warn("Token collision ? existing observation [{}] includes input observation [{}]",
existingObservation, observation);
}
}
}
@Override @Override
public Collection<Observation> getObservations(String registrationId) { public Collection<Observation> getObservations(String registrationId) {
try (var connection = connectionFactory.getConnection()) { try (var connection = connectionFactory.getConnection()) {
@ -651,34 +650,24 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
return result; return result;
} }
private Collection<LwM2mPath> getPreviousPaths(Observation observation, RedisConnection connection) { private SingleObservation validateByAbsorptionExistingObservations(SingleObservation observation, RedisConnection connection) {
Collection<LwM2mPath> result = new ArrayList<>(); LwM2mPath pathObservation = observation.getPath();
AtomicReference<SingleObservation> result = new AtomicReference<>();
Collection<Observation> observations = getObservations(connection, observation.getRegistrationId()); Collection<Observation> observations = getObservations(connection, observation.getRegistrationId());
observations.forEach(obs -> { observations.stream().forEach(obs -> {
LwM2mPath pathObs = ((SingleObservation)obs).getPath();
Collection<LwM2mPath> prevPath = getPreviousPaths( observation, obs); if ((!pathObservation.equals(pathObs) && pathObs.startWith(pathObservation)) || // pathObs = "3/0/9"-> pathObservation = "3"
if (prevPath != null){ (pathObservation.equals(pathObs) && !observation.getId().equals(obs.getId()))) {
result.addAll(prevPath); result.set((SingleObservation)obs);
} else if (!pathObservation.equals(pathObs) && pathObservation.startWith(pathObs)) { // pathObs = "3" -> pathObservation = "3/0/9"
result.set(observation);
} }
}); });
return result.size() > 0 ? result : null; return result.get();
}
private Collection<LwM2mPath> getPreviousPaths(Observation observation, Observation prevObs) {
Collection<LwM2mPath> prevPath = new ArrayList<>();
if (observation instanceof SingleObservation && prevObs instanceof SingleObservation) {
} else if (observation instanceof CompositeObservation) {
if (prevObs instanceof CompositeObservation) { // observation instanceof CompositeObservation && prevObs instanceof CompositeObservation
} else { // observation instanceof CompositeObservation && prevObs instanceof SingleObservation
} }
} else { // observation instanceof SingleObservation prevObs instanceof CompositeObservation
}
return prevPath.size() > 0 ? prevPath : null;
}
@Override @Override
public Collection<Observation> removeObservations(String registrationId) { public Collection<Observation> removeObservations(String registrationId) {
@ -704,103 +693,6 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
} }
} }
/* *************** Californium ObservationStore API **************** */
// public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token,
// org.eclipse.californium.core.observe.Observation obs) throws ObservationStoreException {
// return add(obs, true);
// }
// public org.eclipse.californium.core.observe.Observation put(Token token,
// org.eclipse.californium.core.observe.Observation obs) throws ObservationStoreException {
// return add(obs, false);
// }
// private org.eclipse.californium.core.observe.Observation add(org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) throws ObservationStoreException {
// String endpoint = ObserveUtil.validateCoapObservation(obs);
// org.eclipse.californium.core.observe.Observation previousObservation = null;
//
// try (var connection = connectionFactory.getConnection()) {
// Lock lock = null;
// String lockKey = toLockKey(endpoint);
// try {
// lock = redisLock.obtain(lockKey);
// lock.lock();
//
// String registrationId = ObserveUtil.extractRegistrationId(obs);
// if (!connection.exists(toRegIdKey(registrationId)))
// throw new ObservationStoreException("no registration for this Id");
// byte[] key = toKey(OBS_TKN, obs.getRequest().getToken().getBytes());
// Observation obsLeshan = buildLwM2mObservationFromCfToLeshanCore(obs);
// byte[] serializeObs = serializeObs(obsLeshan);
// byte[] previousValue;
// if (ifAbsent) {
// previousValue = connection.get(key);
// if (previousValue == null || previousValue.length == 0) {
// connection.set(key, serializeObs);
// previousValue = connection.getSet(key, serializeObs);
// } else {
// return buildCoapObservationFromLeshanCoreToCfCore(deserializeObs(previousValue));
// }
// } else {
// previousValue = connection.getSet(key, serializeObs);
// }
//
// // secondary index to get the list by registrationId
// connection.lPush(toKey(OBS_TKNS_REGID_IDX, registrationId), obs.getRequest().getToken().getBytes());
//
// // log any collisions
// if (previousValue != null && previousValue.length != 0) {
// previousObservation = buildCoapObservationFromLeshanCoreToCfCore(deserializeObs(previousValue));
// LOG.warn(
// "Token collision ? observation from request [{}] will be replaced by observation from request [{}] ",
// previousObservation.getRequest(), obs.getRequest());
// }
// } finally {
// if (lock != null) {
// lock.unlock();
// }
// }
// }
// return previousObservation;
// }
//
// public void remove(Token token) {
// try (var connection = connectionFactory.getConnection()) {
// byte[] tokenKey = toKey(OBS_TKN, token.getBytes());
//
// // fetch the observation by token
// byte[] serializedObs = connection.get(tokenKey);
// if (serializedObs == null)
// return;
//
//// org.eclipse.californium.core.observe.Observation obs = deserializeObs(serializedObs);
// org.eclipse.californium.core.observe.Observation obs = null;
// String registrationId = ObserveUtil.extractRegistrationId(obs);
// Registration registration = getRegistration(connection, registrationId);
// if (registration == null) {
// LOG.warn("Unable to remove observation {}, registration {} does not exist anymore", obs.getRequest(),
// registrationId);
// return;
// }
//
// String endpoint = registration.getEndpoint();
// Lock lock = null;
// String lockKey = toLockKey(endpoint);
// try {
// lock = redisLock.obtain(lockKey);
// lock.lock();
//
// unsafeRemoveObservation(connection, registrationId, token.getBytes());
// } finally {
// if (lock != null) {
// lock.unlock();
// }
// }
// }
//
// }
public Observation get(Token token) { public Observation get(Token token) {
try (var connection = connectionFactory.getConnection()) { try (var connection = connectionFactory.getConnection()) {
byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes())); byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes()));
@ -815,8 +707,8 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
/* *************** Observation utility functions **************** */ /* *************** Observation utility functions **************** */
private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) { private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) {
if (connection.del(toKey(OBS_TKN, observationId)) > 0L) { if (connection.commands().del(toKey(OBS_TKN, observationId)) > 0L) {
connection.lRem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId); connection.listCommands().lRem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId);
} }
} }
@ -841,15 +733,23 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
return observationSerDes.serialize(obs); return observationSerDes.serialize(obs);
} }
// private Observation buildLwM2mObservationFromCfToLeshanCore( private void cancelObservation(Observation observation, String registrationId, List<Observation> removed, RedisConnection connection) {
// org.eclipse.californium.core.observe.Observation observation) { for (Observation obs : getObservations(connection, registrationId)) {
// String serializedObservation = observationSerDesCoap.serialize(observation); cancelExistingObservation(connection, observation, obs, removed);
// return serializedObservation == null ? null : ObserveUtil.createLwM2mObservation(observation, serializedObservation); }
// } }
// private org.eclipse.californium.core.observe.Observation buildCoapObservationFromLeshanCoreToCfCore(Observation obs) {
// String serializedObservation = ObserveUtil.extractSerializedObservation(obs); private void cancelExistingObservation(RedisConnection connection, Observation observation, Observation obs, List<Observation> removed) {
// return serializedObservation == null ? null : observationSerDesCoap.deserialize(serializedObservation); LwM2mPath pathObservation = ((SingleObservation)observation).getPath();
// } LwM2mPath pathObs = ((SingleObservation)obs).getPath();
if ((!pathObservation.equals(pathObs) && pathObs.startWith(pathObservation)) || // pathObservation = "3", pathObs = "3/0/9"
(pathObservation.equals(pathObs) && !observation.getId().equals(obs.getId()))) {
unsafeRemoveObservation(connection, obs.getRegistrationId(), obs.getId().getBytes());
removed.add(obs);
} else if (!pathObservation.equals(pathObs) && pathObservation.startWith(pathObs)) { // pathObservation = "3/0/9", pathObs = "3"
unsafeRemoveObservation(connection, obs.getRegistrationId(), observation.getId().getBytes());
}
}
private Observation deserializeObs(byte[] data) { private Observation deserializeObs(byte[] data) {
return data == null ? null : observationSerDes.deserialize(data); return data == null ? null : observationSerDes.deserialize(data);

View File

@ -43,7 +43,7 @@ public class TbLwM2mStoreFactory {
@Bean @Bean
private RegistrationStore registrationStore() { private RegistrationStore registrationStore() {
return redisConfiguration.isPresent() ? return redisConfiguration.isPresent() ?
new TbLwM2mRedisRegistrationStore(getConnectionFactory()) : new TbInMemoryRegistrationStore(new RandomTokenGenerator(config.getCoapConfig()), config.getCleanPeriodInSec(), modelProvider); new TbLwM2mRedisRegistrationStore(new RandomTokenGenerator(config.getCoapConfig()), getConnectionFactory(), modelProvider) : new TbInMemoryRegistrationStore(new RandomTokenGenerator(config.getCoapConfig()), config.getCleanPeriodInSec(), modelProvider);
} }
@Bean @Bean