diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java index ff86e60490..23c7a57dd6 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java @@ -29,7 +29,6 @@ import org.eclipse.leshan.core.util.NamedThreadFactory; import org.eclipse.leshan.core.util.Validate; import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; import org.eclipse.leshan.server.redis.RedisRegistrationStore; -import org.eclipse.leshan.server.redis.serialization.IdentitySerDes; import org.eclipse.leshan.server.redis.serialization.ObservationSerDes; import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes; import org.eclipse.leshan.server.registration.Deregistration; @@ -45,6 +44,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; import org.springframework.integration.redis.util.RedisLockRegistry; +import org.thingsboard.server.transport.lwm2m.server.store.util.LwM2MIdentitySerDes; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -110,12 +110,18 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) { + this(connectionFactory, schedExecutor, cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, + new RedisLockRegistry(connectionFactory, "Registration")); + } + + public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, + long lifetimeGracePeriodInSec, int cleanLimit, RedisLockRegistry lockRegistry) { this.connectionFactory = connectionFactory; this.schedExecutor = schedExecutor; this.cleanPeriod = cleanPeriodInSec; this.cleanLimit = cleanLimit; this.gracePeriod = lifetimeGracePeriodInSec; - this.redisLock = new RedisLockRegistry(connectionFactory, "Registration"); + this.redisLock = lockRegistry; } /* *************** Redis Key utility function **************** */ @@ -173,7 +179,7 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) { removeAddrIndex(connection, oldRegistration); } - if (!oldRegistration.getIdentity().equals(registration.getIdentity())) { + if (registrationsHaveDifferentIdentities(oldRegistration, registration)) { removeIdentityIndex(connection, oldRegistration); } // remove old observation @@ -231,7 +237,7 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) { removeAddrIndex(connection, r); } - if (!r.getIdentity().equals(updatedRegistration.getIdentity())) { + if (registrationsHaveDifferentIdentities(r, updatedRegistration)) { removeIdentityIndex(connection, r); } @@ -402,6 +408,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto connection.zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); } + private boolean registrationsHaveDifferentIdentities(Registration first, Registration second){ + var first_identity_string = LwM2MIdentitySerDes.serialize(first.getIdentity()).toString(); + var second_identity_string = LwM2MIdentitySerDes.serialize(second.getIdentity()).toString(); + return !first_identity_string.equals(second_identity_string); + } + private byte[] toRegIdKey(String registrationId) { return toKey(REG_EP_REGID_IDX, registrationId); } @@ -411,7 +423,7 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto } private byte[] toRegIdentityKey(Identity identity) { - return toKey(REG_EP_IDENTITY, IdentitySerDes.serialize(identity).toString()); + return toKey(REG_EP_IDENTITY, LwM2MIdentitySerDes.serialize(identity).toString()); } private byte[] toEndpointKey(String endpoint) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDes.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDes.java new file mode 100644 index 0000000000..4ffbfd82bb --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDes.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store.util; + +import com.eclipsesource.json.Json; +import com.eclipsesource.json.JsonObject; +import org.apache.commons.lang3.NotImplementedException; +import org.eclipse.leshan.core.request.Identity; +import org.eclipse.leshan.core.util.Hex; + +import java.security.PublicKey; + +public class LwM2MIdentitySerDes { + + private static final String KEY_ADDRESS = "address"; + private static final String KEY_PORT = "port"; + private static final String KEY_ID = "id"; + private static final String KEY_CN = "cn"; + private static final String KEY_RPK = "rpk"; + protected static final String KEY_LWM2MIDENTITY_TYPE = "type"; + protected static final String LWM2MIDENTITY_TYPE_UNSECURE = "unsecure"; + protected static final String LWM2MIDENTITY_TYPE_PSK = "psk"; + protected static final String LWM2MIDENTITY_TYPE_X509 = "x509"; + protected static final String LWM2MIDENTITY_TYPE_RPK = "rpk"; + + public static JsonObject serialize(Identity identity) { + JsonObject o = Json.object(); + + if (identity.isPSK()) { + o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_PSK); + o.set(KEY_ID, identity.getPskIdentity()); + } else if (identity.isRPK()) { + o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_RPK); + PublicKey publicKey = identity.getRawPublicKey(); + o.set(KEY_RPK, Hex.encodeHexString(publicKey.getEncoded())); + } else if (identity.isX509()) { + o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_X509); + o.set(KEY_CN, identity.getX509CommonName()); + } else { + o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_UNSECURE); + o.set(KEY_ADDRESS, identity.getPeerAddress().getHostString()); + o.set(KEY_PORT, identity.getPeerAddress().getPort()); + } + return o; + } + + public static Identity deserialize(JsonObject peer) { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStoreTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStoreTest.java new file mode 100644 index 0000000000..e09f2205b7 --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStoreTest.java @@ -0,0 +1,265 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.eclipse.leshan.core.link.Link; +import org.eclipse.leshan.core.request.Identity; +import org.eclipse.leshan.core.util.NamedThreadFactory; +import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes; +import org.eclipse.leshan.server.registration.Registration; +import org.eclipse.leshan.server.registration.RegistrationUpdate; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.integration.redis.util.RedisLockRegistry; +import org.springframework.test.util.ReflectionTestUtils; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_CLEAN_LIMIT; +import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_CLEAN_PERIOD; +import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_GRACE_PERIOD; + + +@ExtendWith(MockitoExtension.class) +class TbLwM2mRedisRegistrationStoreTest { + + RedisConnectionFactory connectionFactory; + RedisConnection connection; + RedisLockRegistry lockRegistry; + + TbLwM2mRedisRegistrationStore registrationStore; + + @BeforeEach + void setUp() { + lockRegistry = mock(RedisLockRegistry.class); + lenient().when(lockRegistry.obtain(any())).thenReturn(mock(Lock.class)); + connection = mock(RedisConnection.class); + //when(connection.set(any(byte[].class), any(byte[].class))). + connectionFactory = mock(RedisConnectionFactory.class); + lenient().when(connectionFactory.getConnection()).thenReturn(connection); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, + new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", DEFAULT_CLEAN_PERIOD))); + registrationStore = new TbLwM2mRedisRegistrationStore(connectionFactory, executorService, + DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT, lockRegistry); + } + + @Test + void testAddRegistrationWithNoOldRegistration() { + setOldRegistration(null); + Registration registration = buildRegistration(); + + assertThat(registrationStore.addRegistration(registration)).isNull(); + + byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); + verify(connection, times(1)).set(getRegIdKey(registration), endpoint); + verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); + verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); + verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); + verify(connection, times(0)).del(any(byte[].class)); + } + + @Test + void testAddRegistrationWithOldRegistrationEqualToCurrent(){ + var oldRegistration = buildRegistration(); + setOldRegistration(oldRegistration); + Registration registration = buildRegistration(); + + var deregistration = registrationStore.addRegistration(registration); + + assertThat(deregistration.getRegistration()).isEqualTo(oldRegistration); + + byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); + verify(connection, times(1)).set(getRegIdKey(registration), endpoint); + verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); + verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); + verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); + verify(connection, times(1)).del(getTknsRegIdKey(oldRegistration)); + verify(connection, times(1)).del(any(byte[].class)); + } + + @Test + void testAddRegistrationRemovesIndexes(){ + var oldRegistration = buildRegistration(Identity.unsecure(getTestAddress(1234))); + setOldRegistration(oldRegistration); + var registration = buildRegistration(Identity.unsecure(getTestAddress(2345))); + + var deregistration = registrationStore.addRegistration(registration); + + assertThat(deregistration.getRegistration()).isEqualTo(oldRegistration); + byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); + verify(connection, times(1)).set(getRegIdKey(registration), endpoint); + verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); + verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); + verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); + verify(connection, times(1)).del(getRegAddrKey(oldRegistration)); + verify(connection, times(1)).del(getRegIdentityKey(oldRegistration)); + verify(connection, times(1)).del(getTknsRegIdKey(oldRegistration)); + verify(connection, times(3)).del(any(byte[].class)); + } + + @Test + void testUpdateRegistrationWhenNoRegistrationFound() { + setOldRegistration(null); + Registration registration = buildRegistration(); + RegistrationUpdate update = createUpdateFromRegistration(registration); + + assertThat(registrationStore.updateRegistration(update)).isNull(); + + verify(connection, times(1)).get(getRegIdKey(registration)); + verify(connection, times(1)).get(any(byte[].class)); + verify(connection, times(0)).del(any(byte[].class)); + } + + @Test + void testUpdateRegistrationWithSameRegistration() { + Registration registration = buildRegistration(); + setOldRegistration(registration); + RegistrationUpdate update = createUpdateFromRegistration(registration); + + assertThat(registrationStore.updateRegistration(update)).isNotNull(); + + var endpoint = registration.getEndpoint().getBytes(UTF_8); + // check registration and addressIndex here updated + verify(connection, times(1)).set(eq(getEndpointKey(endpoint)), any(byte[].class)); + verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); + verify(connection, times(2)).set(any(byte[].class), any(byte[].class)); + verify(connection, times(0)).del(any(byte[].class)); + } + + @Test + void testUpdateRegistrationWithRegistrationFromSecureIdentitiesWithDifferentAddress() { + Registration oldRegistration = buildRegistration(Identity.psk(getTestAddress(1234), "my:psk")); + setOldRegistration(oldRegistration); + Registration newRegistration = buildRegistration(Identity.psk(getTestAddress(2345), "my:psk")); + RegistrationUpdate update = createUpdateFromRegistration(newRegistration); + assertThat(oldRegistration.getEndpoint()).isEqualTo(newRegistration.getEndpoint()); + + assertThat(registrationStore.updateRegistration(update)).isNotNull(); + + var endpoint = newRegistration.getEndpoint().getBytes(UTF_8); + // check registration and addressIndex here updated + verify(connection, times(1)).set(eq(getEndpointKey(endpoint)), any(byte[].class)); + verify(connection, times(1)).set(getRegAddrKey(newRegistration), endpoint); + // check old AddrIndex has been removed + verify(connection, times(1)).del(getRegAddrKey(oldRegistration)); + // check identityIndex has not been removed + verify(connection, times(0)).del(getRegIdentityKey(oldRegistration)); + // check only one key (AddrIndex) in total was removed + verify(connection, times(1)).del(any(byte[].class)); + } + + @Test + void testGetRegistrationByIdentityReturnsRegistrationForSecureIdentityWithDifferentAddress() { + Registration registration = buildRegistration(Identity.psk(getTestAddress(1234), "my:psk")); + setOldRegistration(registration); + Identity sameIdentityWithDifferentAddress = Identity.psk(getTestAddress(2345), "my:psk"); + + Registration retrievedRegistration = registrationStore.getRegistrationByIdentity(sameIdentityWithDifferentAddress); + + assertThat(retrievedRegistration).isEqualTo(registration); + } + + private void setOldRegistration(Registration oldRegistration){ + byte[] serializedRegistration = null; + if (oldRegistration != null){ + byte[] endpoint = oldRegistration.getEndpoint().getBytes(UTF_8); + // set the AddrIndex + byte[] regAddrKey = getRegAddrKey(oldRegistration); + lenient().when(connection.get(eq(regAddrKey))).thenReturn(endpoint); + // set the IdentityIndex + byte[] regIdentityKey = getRegIdentityKey(oldRegistration); + lenient().when(connection.get(eq(regIdentityKey))).thenReturn(endpoint); + // set the IdIndex + byte[] regIdKey = getRegIdKey(oldRegistration); + lenient().when(connection.get(eq(regIdKey))).thenReturn(endpoint); + // set the registration + serializedRegistration = RegistrationSerDes.bSerialize(oldRegistration); + lenient().when(connection.get(eq(getEndpointKey(endpoint)))).thenReturn(serializedRegistration); + } + lenient().when(connection.getSet(any(byte[].class), any(byte[].class))).thenReturn(serializedRegistration); + } + + private byte[] getRegAddrKey(Registration registration){ + return ReflectionTestUtils.invokeMethod(registrationStore, "toRegAddrKey", registration.getSocketAddress()); + } + + private byte[] getRegIdentityKey(Registration registration){ + return ReflectionTestUtils.invokeMethod(registrationStore, "toRegIdentityKey", registration.getIdentity()); + } + + private byte[] getRegIdKey(Registration registration){ + return ReflectionTestUtils.invokeMethod(registrationStore, "toRegIdKey", registration.getId()); + } + + private byte[] getEndpointKey(byte[] endpoint){ + return ReflectionTestUtils.invokeMethod(registrationStore, "toEndpointKey", endpoint); + } + + private byte[] getTknsRegIdKey(Registration registration){ + return ReflectionTestUtils.invokeMethod(registrationStore, "toKey", "TKNS:REGID:", registration.getId()); + } + + private static Registration buildRegistration() { + return buildRegistration(Identity.psk(getTestAddress(), "my:psk")); + } + + private static Registration buildRegistration(Identity identity){ + return new Registration.Builder("my_reg_id", "abcde", identity) + .objectLinks(new Link[]{}) + .build(); + } + + private static RegistrationUpdate createUpdateFromRegistration(Registration registration){ + return new RegistrationUpdate( + registration.getId(), + registration.getIdentity(), + registration.getLifeTimeInSec(), + registration.getSmsNumber(), + registration.getBindingMode(), + registration.getObjectLinks(), + registration.getAdditionalRegistrationAttributes() + ); + } + + private static InetSocketAddress getTestAddress() { + return getTestAddress(5684); + } + + private static InetSocketAddress getTestAddress(int port) { + try { + return new InetSocketAddress(InetAddress.getByName("1.2.3.4"), port); + } catch (UnknownHostException e) { + throw new AssertionError("Cannot create test address"); + } + } +} \ No newline at end of file diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDesTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDesTest.java new file mode 100644 index 0000000000..c75ed7a195 --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/util/LwM2MIdentitySerDesTest.java @@ -0,0 +1,77 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store.util; + +import com.eclipsesource.json.JsonObject; +import org.apache.commons.lang3.NotImplementedException; +import org.eclipse.leshan.core.request.Identity; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.security.PublicKey; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class LwM2MIdentitySerDesTest { + + @Test + void serializePskIdentity() { + assertThat(LwM2MIdentitySerDes.serialize(Identity.psk(getTestAddress(), "my:psk")).toString()) + .isEqualTo("{\"type\":\"psk\",\"id\":\"my:psk\"}"); + } + + + @Test + void serializeRpkIdentity() { + var public_key = mock(PublicKey.class); + when(public_key.getEncoded()).thenReturn(new byte[]{1,2,3,4,5,6,7,8,9}); + + assertThat(LwM2MIdentitySerDes.serialize(Identity.rpk(getTestAddress(), public_key)).toString()) + .isEqualTo("{\"type\":\"rpk\",\"rpk\":\"010203040506070809\"}"); + } + + @Test + void serializeX509Identity() { + assertThat(LwM2MIdentitySerDes.serialize(Identity.x509(getTestAddress(), "MyCommonName")).toString()) + .isEqualTo("{\"type\":\"x509\",\"cn\":\"MyCommonName\"}"); + } + + @Test + void serializeUnsecureIdentity() { + assertThat(LwM2MIdentitySerDes.serialize(Identity.unsecure(getTestAddress())).toString()) + .isEqualTo("{\"type\":\"unsecure\",\"address\":\"1.2.3.4\",\"port\":5684}"); + } + + + @Test + void deserialize() { + assertThatThrownBy(() -> LwM2MIdentitySerDes.deserialize(mock(JsonObject.class))) + .isInstanceOf(NotImplementedException.class); + } + + private static InetSocketAddress getTestAddress() { + try { + return new InetSocketAddress(InetAddress.getByName("1.2.3.4"), 5684); + } catch (UnknownHostException e) { + throw new AssertionError("Cannot create test address"); + } + } +} \ No newline at end of file