Merge pull request #4824 from YevhenBondarenko/feature/lwm2m-client-store

fetch all clients after startUp
This commit is contained in:
Andrew Shvayka 2021-06-30 13:46:20 +03:00 committed by GitHub
commit 7ec6923451
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 42 deletions

View File

@ -269,7 +269,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
if (requestMd.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
}
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
}

View File

@ -16,34 +16,24 @@
package org.thingsboard.server.transport.lwm2m.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.leshan.core.attributes.Attribute;
import org.eclipse.leshan.core.attributes.AttributeSet;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mMultipleResource;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mObject;
import org.eclipse.leshan.core.node.LwM2mObjectInstance;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.request.SimpleDownlinkRequest;
import org.eclipse.leshan.core.request.WriteAttributesRequest;
import org.eclipse.leshan.core.util.Hex;
import org.eclipse.leshan.server.registration.Registration;
import org.nustaq.serialization.FSTConfiguration;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.device.data.lwm2m.BootstrapConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.transport.lwm2m.config.LwM2mVersion;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue;
@ -56,10 +46,8 @@ import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2MUplinkMs
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static org.eclipse.leshan.core.attributes.Attribute.DIMENSION;

View File

@ -23,9 +23,6 @@ import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ota.OtaPackageKey;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.lwm2m.server.client;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -60,6 +61,7 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.getVerFromPathIdVerOrId;
@Slf4j
@EqualsAndHashCode(of = {"endpoint"})
public class LwM2mClient implements Serializable {
private static final long serialVersionUID = 8793482946289222623L;

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
@ -81,6 +82,17 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
@AfterStartUp
public void init() {
String nodeId = context.getNodeId();
Set<LwM2mClient> fetchedClients = clientStore.getAll();
log.debug("Fetched clients from store: {}", fetchedClients);
fetchedClients.forEach(client -> {
lwM2mClientsByEndpoint.put(client.getEndpoint(), client);
updateFetchedClient(nodeId, client);
});
}
@Override
public LwM2mClient getClientByEndpoint(String endpoint) {
return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> {
@ -91,23 +103,27 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
client = new LwM2mClient(nodeId, ep);
} else {
log.debug("[{}] fetched client from store: {}", endpoint, client);
boolean updated = false;
if (client.getRegistration() != null) {
lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client);
}
if (client.getSession() != null) {
client.refreshSessionId(nodeId);
sessionManager.register(client.getSession());
updated = true;
}
if (updated) {
clientStore.put(client);
}
updateFetchedClient(nodeId, client);
}
return client;
});
}
private void updateFetchedClient(String nodeId, LwM2mClient client) {
boolean updated = false;
if (client.getRegistration() != null) {
lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client);
}
if (client.getSession() != null) {
client.refreshSessionId(nodeId);
sessionManager.register(client.getSession());
updated = true;
}
if (updated) {
clientStore.put(client);
}
}
@Override
public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
TransportProtos.SessionInfoProto oldSession = null;
@ -282,20 +298,21 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
@Override
public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) {
UUID profileId = getClientByEndpoint(registration.getEndpoint()).getProfileId();
Lwm2mDeviceProfileTransportConfiguration result = doGetAndCache(profileId);
if (result == null) {
log.debug("[{}] Fetching profile [{}]", registration.getEndpoint(), profileId);
DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId));
if (deviceProfile != null) {
profileUpdate(deviceProfile);
result = doGetAndCache(profileId);
}
}
return result;
return doGetAndCache(profileId);
}
private Lwm2mDeviceProfileTransportConfiguration doGetAndCache(UUID profileId) {
return profiles.get(profileId);
Lwm2mDeviceProfileTransportConfiguration result = profiles.get(profileId);
if (result == null) {
log.debug("Fetching profile [{}]", profileId);
DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId));
if (deviceProfile != null) {
result = profileUpdate(deviceProfile);
} else {
log.info("Device profile was not found! Most probably device profile [{}] has been removed from the database.", profileId);
}
}
return result;
}
@Override

View File

@ -17,12 +17,20 @@ package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import java.util.Collections;
import java.util.Set;
public class TbDummyLwM2MClientStore implements TbLwM2MClientStore {
@Override
public LwM2mClient get(String endpoint) {
return null;
}
@Override
public Set<LwM2mClient> getAll() {
return Collections.emptySet();
}
@Override
public void put(LwM2mClient client) {

View File

@ -17,10 +17,14 @@ package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import java.util.Set;
public interface TbLwM2MClientStore {
LwM2mClient get(String endpoint);
Set<LwM2mClient> getAll();
void put(LwM2mClient client);
void remove(String endpoint);

View File

@ -23,12 +23,9 @@ import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInf
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mTypeServer.CLIENT;

View File

@ -16,9 +16,17 @@
package org.thingsboard.server.transport.lwm2m.server.store;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
private static final String CLIENT_EP = "CLIENT#EP#";
@ -42,6 +50,30 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
}
}
@Override
public Set<LwM2mClient> getAll() {
try (var connection = connectionFactory.getConnection()) {
Set<LwM2mClient> clients = new HashSet<>();
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(CLIENT_EP + "*").build();
List<Cursor<byte[]>> scans = new ArrayList<>();
if (connection instanceof RedisClusterConnection) {
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
});
} else {
scans.add(connection.scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = connection.get(key);
clients.add((LwM2mClient) serializer.asObject(element));
});
});
return clients;
}
}
@Override
public void put(LwM2mClient client) {
byte[] clientSerialized = serializer.asByteArray(client);