LwM2M Client Store

This commit is contained in:
Andrii Shvaika 2021-06-24 17:54:46 +03:00
parent d190cba277
commit 78f9194669
10 changed files with 87 additions and 162 deletions

View File

@ -87,7 +87,7 @@ public class LwM2mTransportUtil {
public static final String LWM2M_VERSION_DEFAULT = "1.0";
public static final String LOG_LWM2M_TELEMETRY = "logLwm2m";
public static final String LOG_LWM2M_TELEMETRY = "transportLog";
public static final String LOG_LWM2M_INFO = "info";
public static final String LOG_LWM2M_ERROR = "error";
public static final String LOG_LWM2M_WARN = "warn";
@ -169,19 +169,6 @@ public class LwM2mTransportUtil {
return lwM2mOtaConvert;
}
public static LwM2mNode getLvM2mNodeToObject(LwM2mNode content) {
if (content instanceof LwM2mObject) {
return (LwM2mObject) content;
} else if (content instanceof LwM2mObjectInstance) {
return (LwM2mObjectInstance) content;
} else if (content instanceof LwM2mSingleResource) {
return (LwM2mSingleResource) content;
} else if (content instanceof LwM2mMultipleResource) {
return (LwM2mMultipleResource) content;
}
return null;
}
public static Lwm2mDeviceProfileTransportConfiguration toLwM2MClientProfile(DeviceProfile deviceProfile) {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
if (transportConfiguration.getType().equals(DeviceTransportType.LWM2M)) {
@ -196,62 +183,6 @@ public class LwM2mTransportUtil {
return toLwM2MClientProfile(deviceProfile).getBootstrap();
}
public static JsonObject validateJson(String jsonStr) {
JsonObject object = null;
if (jsonStr != null && !jsonStr.isEmpty()) {
String jsonValidFlesh = jsonStr.replaceAll("\\\\", "");
jsonValidFlesh = jsonValidFlesh.replaceAll("\n", "");
jsonValidFlesh = jsonValidFlesh.replaceAll("\t", "");
jsonValidFlesh = jsonValidFlesh.replaceAll(" ", "");
String jsonValid = (jsonValidFlesh.charAt(0) == '"' && jsonValidFlesh.charAt(jsonValidFlesh.length() - 1) == '"') ? jsonValidFlesh.substring(1, jsonValidFlesh.length() - 1) : jsonValidFlesh;
try {
object = new JsonParser().parse(jsonValid).getAsJsonObject();
} catch (JsonSyntaxException e) {
log.error("[{}] Fail validateJson [{}]", jsonStr, e.getMessage());
}
}
return object;
}
@SuppressWarnings("unchecked")
public static <T> Optional<T> decode(byte[] byteArray) {
try {
FSTConfiguration config = FSTConfiguration.createDefaultConfiguration();
T msg = (T) config.asObject(byteArray);
return Optional.ofNullable(msg);
} catch (IllegalArgumentException e) {
log.error("Error during deserialization message, [{}]", e.getMessage());
return Optional.empty();
}
}
public static String splitCamelCaseString(String s) {
LinkedList<String> linkedListOut = new LinkedList<>();
LinkedList<String> linkedList = new LinkedList<String>((Arrays.asList(s.split(" "))));
linkedList.forEach(str -> {
String strOut = str.replaceAll("\\W", "").replaceAll("_", "").toUpperCase();
if (strOut.length() > 1) linkedListOut.add(strOut.charAt(0) + strOut.substring(1).toLowerCase());
else linkedListOut.add(strOut);
});
linkedListOut.set(0, (linkedListOut.get(0).substring(0, 1).toLowerCase() + linkedListOut.get(0).substring(1)));
return StringUtils.join(linkedListOut, "");
}
public static <T> TransportServiceCallback<Void> getAckCallback(LwM2mClient lwM2MClient,
int requestId, String typeTopic) {
return new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void dummy) {
log.trace("[{}] [{}] - requestId [{}] - EndPoint , Access AckCallback", typeTopic, requestId, lwM2MClient.getEndpoint());
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg", e.toString());
}
};
}
public static String fromVersionedIdToObjectId(String pathIdVer) {
try {
if (pathIdVer == null) {

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -38,16 +38,14 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -60,48 +58,38 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.getVerFromPathIdVerOrId;
@Slf4j
public class LwM2mClient implements Cloneable {
public class LwM2mClient implements Serializable {
private static final long serialVersionUID = 8793482946289222623L;
private final String nodeId;
@Getter
private final String endpoint;
private final Lock lock;
private transient final Lock lock = new ReentrantLock();
//TODO: define custom serialization of those fields.
@Getter
private transient final Map<String, ResourceValue> resources;
@Getter
private final Map<String, TsKvProto> sharedAttributes;
@Getter
private TenantId tenantId;
@Getter
private UUID profileId;
@Getter
private UUID deviceId;
@Getter
@Setter
private LwM2MClientState state;
@Getter
private final Map<String, ResourceValue> resources;
@Getter
private final Map<String, TsKvProto> sharedAttributes;
@Getter
private final Queue<LwM2mQueuedRequest> queuedRequests;
@Getter
private String deviceName;
@Getter
private String deviceProfileName;
@Getter
private PowerMode powerMode;
@Getter
private String identity;
@Getter
private SecurityInfo securityInfo;
@Getter
private TenantId tenantId;
@Getter
private UUID deviceId;
@Getter
private SessionInfoProto session;
@Getter
private UUID profileId;
private PowerMode powerMode;
@Getter
@Setter
private Registration registration;
private ValidateDeviceCredentialsResponse credentials;
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
@ -109,23 +97,16 @@ public class LwM2mClient implements Cloneable {
public LwM2mClient(String nodeId, String endpoint) {
this.nodeId = nodeId;
this.endpoint = endpoint;
this.lock = new ReentrantLock();
this.sharedAttributes = new ConcurrentHashMap<>();
this.resources = new ConcurrentHashMap<>();
this.queuedRequests = new ConcurrentLinkedQueue<>();
this.state = LwM2MClientState.CREATED;
}
public void init(String identity, SecurityInfo securityInfo, ValidateDeviceCredentialsResponse credentials, UUID sessionId) {
this.identity = identity;
this.securityInfo = securityInfo;
this.credentials = credentials;
public void init(ValidateDeviceCredentialsResponse credentials, UUID sessionId) {
this.session = createSession(nodeId, sessionId, credentials);
this.tenantId = new TenantId(new UUID(session.getTenantIdMSB(), session.getTenantIdLSB()));
this.deviceId = new UUID(session.getDeviceIdMSB(), session.getDeviceIdLSB());
this.profileId = new UUID(session.getDeviceProfileIdMSB(), session.getDeviceProfileIdLSB());
this.deviceName = session.getDeviceName();
this.deviceProfileName = session.getDeviceType();
this.powerMode = credentials.getDeviceInfo().getPowerMode();
}
@ -140,10 +121,9 @@ public class LwM2mClient implements Cloneable {
public void onDeviceUpdate(Device device, Optional<DeviceProfile> deviceProfileOpt) {
SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session);
this.deviceId = device.getUuidId();
this.deviceName = device.getName();
builder.setDeviceIdMSB(deviceId.getMostSignificantBits());
builder.setDeviceIdLSB(deviceId.getLeastSignificantBits());
builder.setDeviceName(deviceName);
builder.setDeviceName(device.getName());
deviceProfileOpt.ifPresent(deviceProfile -> updateSession(deviceProfile, builder));
this.session = builder.build();
this.powerMode = ((Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration()).getPowerMode();
@ -156,11 +136,10 @@ public class LwM2mClient implements Cloneable {
}
private void updateSession(DeviceProfile deviceProfile, SessionInfoProto.Builder builder) {
this.deviceProfileName = deviceProfile.getName();
this.profileId = deviceProfile.getUuidId();
builder.setDeviceProfileIdMSB(profileId.getMostSignificantBits());
builder.setDeviceProfileIdLSB(profileId.getLeastSignificantBits());
builder.setDeviceType(this.deviceProfileName);
builder.setDeviceType(deviceProfile.getName());
}
private SessionInfoProto createSession(String nodeId, UUID sessionId, ValidateDeviceCredentialsResponse msg) {

View File

@ -28,8 +28,6 @@ import java.util.UUID;
public interface LwM2mClientContext {
LwM2mClient getClientByRegistrationId(String registrationId);
LwM2mClient getClientByEndpoint(String endpoint);
LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo);
@ -53,8 +51,6 @@ public interface LwM2mClientContext {
LwM2mClient getClientByDeviceId(UUID deviceId);
String getObjectIdByKeyNameFromProfile(TransportProtos.SessionInfoProto sessionInfo, String keyName);
String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName);
void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials);

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -31,6 +31,7 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore;
import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
import java.util.Arrays;
@ -56,13 +57,27 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
private final LwM2mTransportContext context;
private final LwM2MTransportServerConfig config;
private final TbMainSecurityStore securityStore;
private final TbLwM2MClientStore clientStore;
private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
@Override
public LwM2mClient getClientByEndpoint(String endpoint) {
return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> new LwM2mClient(context.getNodeId(), ep));
return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> {
LwM2mClient client = clientStore.get(ep);
if (client == null) {
log.info("[{}] initialized new client.", endpoint);
client = new LwM2mClient(context.getNodeId(), ep);
} else {
log.debug("[{}] fetched client from store: {}", endpoint, client);
if (client.getRegistration() != null) {
lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client);
//TODO: create ThingsBoard session.
}
}
return client;
});
}
@Override
@ -82,9 +97,9 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
if (securityInfo.getDeviceProfile() != null) {
profileUpdate(securityInfo.getDeviceProfile());
if (securityInfo.getSecurityInfo() != null) {
lwM2MClient.init(securityInfo.getSecurityInfo().getIdentity(), securityInfo.getSecurityInfo(), securityInfo.getMsg(), UUID.randomUUID());
lwM2MClient.init(securityInfo.getMsg(), UUID.randomUUID());
} else if (NO_SEC.equals(securityInfo.getSecurityMode())) {
lwM2MClient.init(null, null, securityInfo.getMsg(), UUID.randomUUID());
lwM2MClient.init(securityInfo.getMsg(), UUID.randomUUID());
} else {
throw new RuntimeException(String.format("Registration failed: device %s not found.", lwM2MClient.getEndpoint()));
}
@ -97,6 +112,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
lwM2MClient.setRegistration(registration);
this.lwM2mClientsByRegistrationId.put(registration.getId(), lwM2MClient);
lwM2MClient.setState(LwM2MClientState.REGISTERED);
clientStore.put(lwM2MClient);
} finally {
lwM2MClient.unlock();
}
@ -111,6 +127,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state.");
}
lwM2MClient.setRegistration(registration);
clientStore.put(lwM2MClient);
} finally {
lwM2MClient.unlock();
}
@ -129,6 +146,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
lwM2MClient.setState(LwM2MClientState.UNREGISTERED);
lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint());
this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId());
clientStore.remove(lwM2MClient.getEndpoint());
UUID profileId = lwM2MClient.getProfileId();
if (profileId != null) {
Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
@ -144,11 +162,6 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
}
}
@Override
public LwM2mClient getClientByRegistrationId(String registrationId) {
return lwM2mClientsByRegistrationId.get(registrationId);
}
@Override
public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
LwM2mClient lwM2mClient = null;
@ -169,18 +182,6 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
return lwM2mClient;
}
/**
* Get path to resource from profile equal keyName
*
* @param sessionInfo -
* @param keyName -
* @return -
*/
@Override
public String getObjectIdByKeyNameFromProfile(TransportProtos.SessionInfoProto sessionInfo, String keyName) {
return getObjectIdByKeyNameFromProfile(getClientBySessionInfo(sessionInfo), keyName);
}
@Override
public String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName) {
Lwm2mDeviceProfileTransportConfiguration profile = getProfile(lwM2mClient.getProfileId());
@ -198,7 +199,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
@Override
public void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials) {
LwM2mClient client = getClientByEndpoint(registration.getEndpoint());
client.init(null, null, credentials, UUID.randomUUID());
client.init(credentials, UUID.randomUUID());
lwM2mClientsByRegistrationId.put(registration.getId(), client);
profileUpdate(credentials.getDeviceProfile());
}

View File

@ -31,18 +31,8 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.L
@RequiredArgsConstructor
public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService {
private final LwM2mClientContext clientContext;
private final LwM2mTransportServerHelper helper;
/**
* @param logMsg - text msg
* @param registrationId - Id of Registration LwM2M Client
*/
@Override
public void log(String registrationId, String logMsg) {
log(clientContext.getClientByRegistrationId(registrationId), logMsg);
}
@Override
public void log(LwM2mClient client, String logMsg) {
if (logMsg != null && client != null && client.getSession() != null) {

View File

@ -21,6 +21,4 @@ public interface LwM2MTelemetryLogService {
void log(LwM2mClient client, String msg);
void log(String registrationId, String msg);
}

View File

@ -0,0 +1,20 @@
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
public class TbDummyLwM2MClientStore implements TbLwM2MClientStore {
@Override
public LwM2mClient get(String endpoint) {
return null;
}
@Override
public void put(LwM2mClient client) {
}
@Override
public void remove(String endpoint) {
}
}

View File

@ -0,0 +1,12 @@
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
public interface TbLwM2MClientStore {
LwM2mClient get(String endpoint);
void put(LwM2mClient client);
void remove(String endpoint);
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -56,6 +56,11 @@ public class TbLwM2mStoreFactory {
new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator);
}
@Bean
private TbLwM2MClientStore clientStore() {
return new TbDummyLwM2MClientStore();
}
@Bean
private TbLwM2MDtlsSessionStore sessionStore() {
return redisConfiguration.isPresent() && useRedis ?

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -247,14 +247,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
log.warn("[{}] [{{}] Client: update after Registration", registration.getEndpoint(), registration.getId());
logService.log(lwM2MClient, String.format("[%s][%s] Updated registration.", registration.getId(), registration.getSocketAddress()));
clientContext.updateRegistration(lwM2MClient, registration);
TransportProtos.SessionInfoProto sessionInfo = lwM2MClient.getSession();
this.reportActivityAndRegister(sessionInfo);
if (registration.usesQueueMode()) {
LwM2mQueuedRequest request;
while ((request = lwM2MClient.getQueuedRequests().poll()) != null) {
request.send();
}
}
this.reportActivityAndRegister(lwM2MClient.getSession());
} catch (LwM2MClientStateException stateException) {
if (LwM2MClientState.REGISTERED.equals(stateException.getState())) {
log.info("[{}] update registration failed because client has different registration id: [{}] {}.", registration.getEndpoint(), stateException.getState(), stateException.getMessage());