diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 2d5f107d61..4d41b258f3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -269,7 +269,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); + if (requestMd.getMsg().getMsg().isPersisted()) { + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); + } systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java index 923c9d7bad..38f3877cb7 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java @@ -16,34 +16,24 @@ package org.thingsboard.server.transport.lwm2m.server; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.leshan.core.attributes.Attribute; import org.eclipse.leshan.core.attributes.AttributeSet; import org.eclipse.leshan.core.model.ObjectModel; import org.eclipse.leshan.core.model.ResourceModel; -import org.eclipse.leshan.core.node.LwM2mMultipleResource; -import org.eclipse.leshan.core.node.LwM2mNode; -import org.eclipse.leshan.core.node.LwM2mObject; -import org.eclipse.leshan.core.node.LwM2mObjectInstance; import org.eclipse.leshan.core.node.LwM2mPath; import org.eclipse.leshan.core.node.LwM2mResource; -import org.eclipse.leshan.core.node.LwM2mSingleResource; import org.eclipse.leshan.core.node.codec.CodecException; import org.eclipse.leshan.core.request.SimpleDownlinkRequest; import org.eclipse.leshan.core.request.WriteAttributesRequest; import org.eclipse.leshan.core.util.Hex; import org.eclipse.leshan.server.registration.Registration; -import org.nustaq.serialization.FSTConfiguration; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.device.data.lwm2m.BootstrapConfiguration; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.transport.lwm2m.config.LwM2mVersion; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue; @@ -56,10 +46,8 @@ import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2MUplinkMs import java.util.ArrayList; import java.util.Arrays; import java.util.Date; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import static org.eclipse.leshan.core.attributes.Attribute.DIMENSION; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java index 0f0d95610b..3bf6883e7b 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java @@ -23,9 +23,6 @@ import org.eclipse.leshan.core.model.ResourceModel; import org.eclipse.leshan.core.node.LwM2mPath; import org.eclipse.leshan.core.node.LwM2mResource; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.ota.OtaPackageKey; -import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java index 8cf6f20f36..41e4b88f28 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.lwm2m.server.client; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -60,6 +61,7 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.getVerFromPathIdVerOrId; @Slf4j +@EqualsAndHashCode(of = {"endpoint"}) public class LwM2mClient implements Serializable { private static final long serialVersionUID = 8793482946289222623L; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java index 3802ef8234..04c4cc6bcd 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; @@ -81,6 +82,17 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { private final Map lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); private final Map profiles = new ConcurrentHashMap<>(); + @AfterStartUp + public void init() { + String nodeId = context.getNodeId(); + Set fetchedClients = clientStore.getAll(); + log.debug("Fetched clients from store: {}", fetchedClients); + fetchedClients.forEach(client -> { + lwM2mClientsByEndpoint.put(client.getEndpoint(), client); + updateFetchedClient(nodeId, client); + }); + } + @Override public LwM2mClient getClientByEndpoint(String endpoint) { return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> { @@ -91,23 +103,27 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { client = new LwM2mClient(nodeId, ep); } else { log.debug("[{}] fetched client from store: {}", endpoint, client); - boolean updated = false; - if (client.getRegistration() != null) { - lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client); - } - if (client.getSession() != null) { - client.refreshSessionId(nodeId); - sessionManager.register(client.getSession()); - updated = true; - } - if (updated) { - clientStore.put(client); - } + updateFetchedClient(nodeId, client); } return client; }); } + private void updateFetchedClient(String nodeId, LwM2mClient client) { + boolean updated = false; + if (client.getRegistration() != null) { + lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client); + } + if (client.getSession() != null) { + client.refreshSessionId(nodeId); + sessionManager.register(client.getSession()); + updated = true; + } + if (updated) { + clientStore.put(client); + } + } + @Override public Optional register(LwM2mClient client, Registration registration) throws LwM2MClientStateException { TransportProtos.SessionInfoProto oldSession = null; @@ -282,20 +298,21 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @Override public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) { UUID profileId = getClientByEndpoint(registration.getEndpoint()).getProfileId(); - Lwm2mDeviceProfileTransportConfiguration result = doGetAndCache(profileId); - if (result == null) { - log.debug("[{}] Fetching profile [{}]", registration.getEndpoint(), profileId); - DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId)); - if (deviceProfile != null) { - profileUpdate(deviceProfile); - result = doGetAndCache(profileId); - } - } - return result; + return doGetAndCache(profileId); } private Lwm2mDeviceProfileTransportConfiguration doGetAndCache(UUID profileId) { - return profiles.get(profileId); + Lwm2mDeviceProfileTransportConfiguration result = profiles.get(profileId); + if (result == null) { + log.debug("Fetching profile [{}]", profileId); + DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId)); + if (deviceProfile != null) { + result = profileUpdate(deviceProfile); + } else { + log.info("Device profile was not found! Most probably device profile [{}] has been removed from the database.", profileId); + } + } + return result; } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MClientStore.java index 3708bf4b0e..767ab43493 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MClientStore.java @@ -17,12 +17,20 @@ package org.thingsboard.server.transport.lwm2m.server.store; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import java.util.Collections; +import java.util.Set; + public class TbDummyLwM2MClientStore implements TbLwM2MClientStore { @Override public LwM2mClient get(String endpoint) { return null; } + @Override + public Set getAll() { + return Collections.emptySet(); + } + @Override public void put(LwM2mClient client) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MClientStore.java index 16e4e9e90a..55b0423b05 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MClientStore.java @@ -17,10 +17,14 @@ package org.thingsboard.server.transport.lwm2m.server.store; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import java.util.Set; + public interface TbLwM2MClientStore { LwM2mClient get(String endpoint); + Set getAll(); + void put(LwM2mClient client); void remove(String endpoint); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java index bf1f275f32..ac0d3d0e68 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java @@ -23,12 +23,9 @@ import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInf import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import static org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mTypeServer.CLIENT; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java index a408cc22c4..d735eed26e 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java @@ -16,9 +16,17 @@ package org.thingsboard.server.transport.lwm2m.server.store; import org.nustaq.serialization.FSTConfiguration; +import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { private static final String CLIENT_EP = "CLIENT#EP#"; @@ -42,6 +50,30 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { } } + @Override + public Set getAll() { + try (var connection = connectionFactory.getConnection()) { + Set clients = new HashSet<>(); + ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(CLIENT_EP + "*").build(); + List> scans = new ArrayList<>(); + if (connection instanceof RedisClusterConnection) { + ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { + scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); + }); + } else { + scans.add(connection.scan(scanOptions)); + } + + scans.forEach(scan -> { + scan.forEachRemaining(key -> { + byte[] element = connection.get(key); + clients.add((LwM2mClient) serializer.asObject(element)); + }); + }); + return clients; + } + } + @Override public void put(LwM2mClient client) { byte[] clientSerialized = serializer.asByteArray(client);