lwm2m: add Redis

This commit is contained in:
nick 2024-01-08 16:00:05 +02:00
parent d59bab4f0f
commit 2b82768225
4 changed files with 124 additions and 191 deletions

View File

@ -255,7 +255,6 @@ public class LwM2MTestClient {
*/
boolean reconnectOnUpdate = false;
engineFactory.setReconnectOnUpdate(reconnectOnUpdate);
// engineFactory.setReconnectOnUpdate(false); // old
engineFactory.setResumeOnConnect(true);
// new
/**

View File

@ -128,26 +128,6 @@ public class LwM2MTransportBootstrapService {
/* Create DTLS Config */
this.setServerWithCredentials(builder);
// DtlsConnectorConfig dtlsConfig;
// try {
// dtlsConfig = dtlsConfigBuilder.build();
// } catch (IllegalStateException e) {
// log.warn("Unable to create DTLS config for endpont {}.", endpointUri.toString(), e);
// return null;
// }
//
// Connector dTLSConnector = new DTLSConnector(dtlsConfig);
// endpointsBuilder.setConnector(dTLSConnector);
// endpointsBuilder.setConfiguration(serverCoapConfig);
// endpointsBuilder.setLoggingTag(String.format("[%s]", "/" + "options.getUriPathString()"));
// endpointsBuilder.setEndpointContextMatcher(new Lwm2mEndpointContextMatcher());
/* Create credentials */
// Set Californium Configuration
endpointsBuilder.setConfiguration(serverCoapConfig);
@ -161,46 +141,6 @@ public class LwM2MTransportBootstrapService {
InetSocketAddress coapsAddr = new InetSocketAddress(bootstrapConfig.getSecureHost(), bootstrapConfig.getSecurePort());
endpointsBuilder.addEndpoint(coapsAddr, Protocol.COAPS);
// builder.setLocalAddress(config.getHost(), config.getPort());
// builder.setLocalSecureAddress(config.getSecureHost(), config.getSecurePort());
// builder.setDecoder(new DefaultLwM2mDecoder());
/* Use a magic converter to support bad type send by the UI. */
// builder.setEncoder(new DefaultLwM2mEncoder(LwM2mValueConverterImpl.getInstance()));
/* Create CoAP Config */
// builder.setCoapConfig(getCoapConfig(config.getPort(), config.getSecurePort(), config));
//
// /* Set securityStore with new registrationStore */
// builder.setSecurityStore(securityStore);
// builder.setRegistrationStore(registrationStore);
//
//
// // Create LWM2M server
// builder.setEndpointsProviders(endpointsBuilder.build());
//
//
//
// builder.setLocalAddress(bootstrapConfig.getHost(), bootstrapConfig.getPort());
// builder.setLocalSecureAddress(bootstrapConfig.getSecureHost(), bootstrapConfig.getSecurePort());
//
// /* Create CoAP Config */
// builder.setCoapConfig(getCoapConfig(bootstrapConfig.getPort(), bootstrapConfig.getSecurePort(), serverConfig));
//
//
// /* Create and Set DTLS Config */
// DtlsConnectorConfig.Builder dtlsConfig = new DtlsConnectorConfig.Builder(getCoapConfig(bootstrapConfig.getPort(), bootstrapConfig.getSecurePort(), serverConfig));
//
// dtlsConfig.set(DTLS_RECOMMENDED_CURVES_ONLY, serverConfig.isRecommendedSupportedGroups());
// dtlsConfig.set(DTLS_RECOMMENDED_CIPHER_SUITES_ONLY, serverConfig.isRecommendedCiphers());
// dtlsConfig.set(DTLS_RETRANSMISSION_TIMEOUT, serverConfig.getDtlsRetransmissionTimeout(), MILLISECONDS);
// dtlsConfig.set(DTLS_ROLE, SERVER_ONLY);
// setServerWithCredentials(builder, dtlsConfig);
//
// /* Set DTLS Config */
// builder.setDtlsConfig(dtlsConfig);
/* Set securityStore with new ConfigStore */
builder.setConfigStore(lwM2MInMemoryBootstrapConfigStore);

View File

@ -182,29 +182,6 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
/* Create DTLS Config */
this.setServerWithCredentials(builder);
// DtlsConnectorConfig dtlsConfig;
// try {
// dtlsConfig = dtlsConfigBuilder.build();
// } catch (IllegalStateException e) {
// log.warn("Unable to create DTLS config for endpont {}.", endpointUri.toString(), e);
// return null;
// }
//
// Connector dTLSConnector = new DTLSConnector(dtlsConfig);
// endpointsBuilder.setConnector(dTLSConnector);
// endpointsBuilder.setConfiguration(serverCoapConfig);
// endpointsBuilder.setLoggingTag(String.format("[%s]", "/" + "options.getUriPathString()"));
// endpointsBuilder.setEndpointContextMatcher(new Lwm2mEndpointContextMatcher());
/* Create credentials */
// Set Californium Configuration
endpointsBuilder.setConfiguration(serverCoapConfig);
@ -220,13 +197,6 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
builder.setDecoder(new DefaultLwM2mDecoder(true));
builder.setEncoder(new DefaultLwM2mEncoder(true));
// builder.setDecoder(new DefaultLwM2mDecoder());
// /* Use a magic converter to support bad type send by the UI. */
// builder.setEncoder(new DefaultLwM2mEncoder(LwM2mValueConverterImpl.getInstance()));
/* Create CoAP Config */
// builder.setCoapConfig(getCoapConfig(config.getPort(), config.getSecurePort(), config));
// Create LWM2M server
builder.setEndpointsProviders(endpointsBuilder.build());

View File

@ -15,11 +15,11 @@
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.network.serialization.UdpDataParser;
import org.eclipse.californium.core.network.serialization.UdpDataSerializer;
import org.eclipse.californium.core.observe.ObservationStoreException;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable;
@ -30,8 +30,8 @@ import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.peer.LwM2mIdentity;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.core.util.Validate;
import org.eclipse.leshan.server.californium.observation.ObservationSerDes;
import org.eclipse.leshan.server.redis.RedisRegistrationStore;
import org.eclipse.leshan.server.redis.serialization.ObservationSerDes;
import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes;
import org.eclipse.leshan.server.registration.Deregistration;
import org.eclipse.leshan.server.registration.ExpirationListener;
@ -65,6 +65,7 @@ import java.util.concurrent.locks.Lock;
import static java.nio.charset.StandardCharsets.UTF_8;
@Slf4j
public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startable, Stoppable, Destroyable {
/** Default time in seconds between 2 cleaning tasks (used to remove expired registration). */
public static final long DEFAULT_CLEAN_PERIOD = 60;
@ -86,7 +87,9 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
// (expiration date, Endpoint)
private final RegistrationSerDes registrationSerDes = new RegistrationSerDes();
private final ObservationSerDes observationSerDes = new ObservationSerDes(new UdpDataParser(), new UdpDataSerializer());
private final ObservationSerDes observationSerDes = new ObservationSerDes();
private final org.eclipse.leshan.server.californium.observation.ObservationSerDes observationSerDesCoap =
new org.eclipse.leshan.server.californium.observation.ObservationSerDes(new UdpDataParser(), new UdpDataSerializer());
private final RedisConnectionFactory connectionFactory;
// Listener use to notify when a registration expires
@ -255,6 +258,18 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
return getRegistration(connection, registrationId);
}
}
private Registration getRegistration(RedisConnection connection, String registrationId) {
byte[] ep = connection.get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
byte[] data = connection.get(toEndpointKey(ep));
if (data == null) {
return null;
}
return deserializeReg(data);
}
@Override
public Registration getRegistrationByEndpoint(String endpoint) {
@ -331,63 +346,6 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
}
}
@Override
public Collection<Observation> addObservation(String registrationId, Observation observation, boolean addIfAbsent) {
List<Observation> removed = new ArrayList<>();
try (var connection = connectionFactory.getConnection()) {
// fetch the client ep by registration ID index
byte[] ep = connection.get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
Lock lock = null;
String lockKey = toLockKey(ep);
try {
lock = redisLock.obtain(lockKey);
lock.lock();
// cancel existing observations for the same path and registration id.
for (Observation obs : getObservations(connection, registrationId)) {
//TODO: should be able to use CompositeObservation
if (((SingleObservation)observation).getPath().equals(((SingleObservation)obs).getPath())
&& !observation.getId().equals(obs.getId())) {
removed.add(obs);
unsafeRemoveObservation(connection, registrationId, obs.getId().getBytes());
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
return removed;
}
@Override
public Collection<Observation> getObservations(String registrationId) {
try (var connection = connectionFactory.getConnection()) {
return getObservations(connection, registrationId);
}
}
@Override
public Observation getObservation(String registrationId, ObservationIdentifier observationId) {
return getObservations(registrationId).stream().filter(o -> o.getId()==observationId).findFirst().get();
}
@Override
public Observation getObservation(ObservationIdentifier observationId) {
return null;
}
@Override
public Observation removeObservation(String registrationId, ObservationIdentifier observationId) {
return removeObservation(registrationId, observationId.getBytes());
}
private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) {
// fetch the client ep by registration ID index
@ -499,6 +457,91 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
* org.eclipse.californium.core.observe.ObservationStore#add method)
*/
@Override
public Collection<Observation> addObservation(String registrationId, Observation observation, boolean addIfAbsent) {
List<Observation> removed = new ArrayList<>();
try (var connection = connectionFactory.getConnection()) {
// fetch the client ep by registration ID index
byte[] ep = connection.get(toRegIdKey(registrationId));
if (ep == null) {
throw new IllegalStateException(String.format(
"can not add observation %s there is no registration with id %s", observation, registrationId));
}
Lock lock = null;
String lockKey = toLockKey(ep);
try {
lock = redisLock.obtain(lockKey);
lock.lock();
// Add and Get previous observation
byte[] previousValue;
byte[] key = toKey(OBS_TKN, observation.getId().getBytes());
byte[] serializeObs = serializeObs(observation);
if (addIfAbsent) {
previousValue = connection.get(key);
if (previousValue == null || previousValue.length == 0) {
connection.set(key, serializeObs);
}
} else {
previousValue = connection.getSet(key, serializeObs);
}
// secondary index to get the list by registrationId
connection.lPush(toKey(OBS_TKNS_REGID_IDX, registrationId), observation.getId().getBytes());
// log any collisions
Observation previousObservation;
if (previousValue != null && previousValue.length != 0) {
previousObservation = deserializeObs(previousValue);
LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}] ",
previousObservation, observation);
}
// cancel existing observations for the same path and registration id.
for (Observation obs : getObservations(connection, registrationId)) {
//TODO: should be able to use CompositeObservation
if (((SingleObservation)observation).getPath().equals(((SingleObservation)obs).getPath())
&& !observation.getId().equals(obs.getId())) {
removed.add(obs);
unsafeRemoveObservation(connection, registrationId, obs.getId().getBytes());
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
return removed;
}
@Override
public Collection<Observation> getObservations(String registrationId) {
try (var connection = connectionFactory.getConnection()) {
return getObservations(connection, registrationId);
}
}
@Override
public Observation getObservation(String registrationId, ObservationIdentifier observationId) {
return getObservations(registrationId).stream().filter(
o -> o.getId().getAsHexString().equals(observationId.getAsHexString())).findFirst().get();
}
@Override
public Observation getObservation(ObservationIdentifier observationId) {
try (var connection = connectionFactory.getConnection()) {
byte[] observationValue = connection.get(toKey(OBS_TKN, observationId.getBytes()));
return deserializeObs(observationValue);
}
}
@Override
public Observation removeObservation(String registrationId, ObservationIdentifier observationId) {
return removeObservation(registrationId, observationId.getBytes());
}
public Observation removeObservation(String registrationId, byte[] observationId) {
try (var connection = connectionFactory.getConnection()) {
@ -515,8 +558,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
lock = redisLock.obtain(lockKey);
lock.lock();
// Observation observation = build(get(new Token(observationId)));
Observation observation = build(null);
Observation observation = get(new Token(observationId));
if (observation != null && registrationId.equals(observation.getRegistrationId())) {
unsafeRemoveObservation(connection, registrationId, observationId);
return observation;
@ -536,7 +578,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
for (byte[] token : connection.lRange(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, -1)) {
byte[] obs = connection.get(toKey(OBS_TKN, token));
if (obs != null) {
result.add(build(deserializeObs(obs)));
result.add(deserializeObs(obs));
}
}
return result;
@ -593,15 +635,16 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
if (!connection.exists(toRegIdKey(registrationId)))
throw new ObservationStoreException("no registration for this Id");
byte[] key = toKey(OBS_TKN, obs.getRequest().getToken().getBytes());
byte[] serializeObs = serializeObs(obs);
Observation obsLeshan = buildLwM2mObservationFromCfToLeshanCore(obs);
byte[] serializeObs = serializeObs(obsLeshan);
byte[] previousValue;
if (ifAbsent) {
previousValue = connection.get(key);
if (previousValue == null || previousValue.length == 0) {
connection.set(key, serializeObs);
previousValue = connection.getSet(key, serializeObs);
} else {
// return deserializeObs(previousValue);
return null;
return buildCoapObservationFromLeshanCoreToCfCore(deserializeObs(previousValue));
}
} else {
previousValue = connection.getSet(key, serializeObs);
@ -612,8 +655,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
// log any collisions
if (previousValue != null && previousValue.length != 0) {
// previousObservation = deserializeObs(previousValue);
previousObservation = null;
previousObservation = buildCoapObservationFromLeshanCoreToCfCore(deserializeObs(previousValue));
LOG.warn(
"Token collision ? observation from request [{}] will be replaced by observation from request [{}] ",
previousObservation.getRequest(), obs.getRequest());
@ -663,33 +705,19 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
}
public org.eclipse.californium.core.observe.Observation get(Token token) {
public Observation get(Token token) {
try (var connection = connectionFactory.getConnection()) {
byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes()));
if (obs == null) {
return null;
} else {
// return deserializeObs(obs);
return null;
return deserializeObs(obs);
}
}
}
/* *************** Observation utility functions **************** */
private Registration getRegistration(RedisConnection connection, String registrationId) {
byte[] ep = connection.get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}
byte[] data = connection.get(toEndpointKey(ep));
if (data == null) {
return null;
}
return deserializeReg(data);
}
private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) {
if (connection.del(toKey(OBS_TKN, observationId)) > 0L) {
connection.lRem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId);
@ -704,7 +732,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
for (byte[] token : connection.lRange(regIdKey, 0, -1)) {
byte[] obs = connection.get(toKey(OBS_TKN, token));
if (obs != null) {
removed.add(build(deserializeObs(obs)));
removed.add(deserializeObs(obs));
}
connection.del(toKey(OBS_TKN, token));
}
@ -713,26 +741,22 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
return removed;
}
public void setContext(Token token, EndpointContext correlationContext) {
// In Leshan we always set context when we send the request, so this should not be needed to implement this.
private byte[] serializeObs(Observation obs) {
return observationSerDes.serialize(obs);
}
private byte[] serializeObs(org.eclipse.californium.core.observe.Observation obs) {
return null;
// return observationSerDes.serialize(obs);
private org.eclipse.leshan.core.observation.Observation buildLwM2mObservationFromCfToLeshanCore(
org.eclipse.californium.core.observe.Observation observation) {
String serializedObservation = observationSerDesCoap.serialize(observation);
return serializedObservation == null ? null : ObserveUtil.createLwM2mObservation(observation, serializedObservation);
}
private org.eclipse.californium.core.observe.Observation buildCoapObservationFromLeshanCoreToCfCore(Observation obs) {
String serializedObservation = ObserveUtil.extractSerializedObservation(obs);
return serializedObservation == null ? null : observationSerDesCoap.deserialize(serializedObservation);
}
private Observation deserializeObs(byte[] data) {
// return observationSerDes.deserialize(data);
return null;
}
private Observation build(Observation cfObs) {
if (cfObs == null)
return null;
// String serializedObservation = observationSerDes.serialize(cfObs);
// return ObserveUtil.createLwM2mObservation(cfObs, serializedObservation);
return null;
return data == null ? null : observationSerDes.deserialize(data);
}
/* *************** Expiration handling **************** */