diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java index 730effaf3c..54a9312eda 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java @@ -78,7 +78,7 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig { @Getter @Value("${transport.lwm2m.security.key_store:}") - private String keyStorePathFile; + private String keyStoreFilePath; @Getter @Setter @@ -141,14 +141,27 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig { public void init() { URI uri = null; try { - uri = Resources.getResource(keyStorePathFile).toURI(); - log.info("URI: {}", uri); - File keyStoreFile = new File(uri); - InputStream inKeyStore = new FileInputStream(keyStoreFile); + InputStream keyStoreInputStream; + File keyStoreFile = new File(keyStoreFilePath); + if (keyStoreFile.exists()) { + log.info("Reading key store from file {}", keyStoreFilePath); + keyStoreInputStream = new FileInputStream(keyStoreFile); + } else { + InputStream classPathStream = this.getClass().getClassLoader().getResourceAsStream(keyStoreFilePath); + if (classPathStream != null) { + log.info("Reading key store from class path {}", keyStoreFilePath); + keyStoreInputStream = classPathStream; + } else { + uri = Resources.getResource(keyStoreFilePath).toURI(); + log.info("Reading key store from URI {}", keyStoreFilePath); + keyStoreInputStream = new FileInputStream(new File(uri)); + } + } keyStoreValue = KeyStore.getInstance(keyStoreType); - keyStoreValue.load(inKeyStore, keyStorePassword == null ? null : keyStorePassword.toCharArray()); + keyStoreValue.load(keyStoreInputStream, keyStorePassword == null ? null : keyStorePassword.toCharArray()); } catch (Exception e) { - log.info("Unable to lookup LwM2M keystore. Reason: {}, {}" , uri, e.getMessage()); + log.info("Unable to lookup LwM2M keystore. Reason: {}, {}", uri, e.getMessage()); } } + } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java index 56ab29901c..b4b73a0625 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java @@ -24,6 +24,8 @@ import org.eclipse.leshan.server.registration.Registration; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; @@ -61,6 +63,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { private final TbMainSecurityStore securityStore; private final TbLwM2MClientStore clientStore; private final LwM2MSessionManager sessionManager; + private final TransportDeviceProfileCache deviceProfileCache; private final Map lwM2mClientsByEndpoint = new ConcurrentHashMap<>(); private final Map lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); private final Map profiles = new ConcurrentHashMap<>(); @@ -231,12 +234,26 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @Override public Lwm2mDeviceProfileTransportConfiguration getProfile(UUID profileId) { - return profiles.get(profileId); + return doGetAndCache(profileId); } @Override public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) { - return profiles.get(getClientByEndpoint(registration.getEndpoint()).getProfileId()); + UUID profileId = getClientByEndpoint(registration.getEndpoint()).getProfileId(); + Lwm2mDeviceProfileTransportConfiguration result = doGetAndCache(profileId); + if (result == null) { + log.debug("[{}] Fetching profile [{}]", registration.getEndpoint(), profileId); + DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId)); + if (deviceProfile != null) { + profileUpdate(deviceProfile); + result = doGetAndCache(profileId); + } + } + return result; + } + + private Lwm2mDeviceProfileTransportConfiguration doGetAndCache(UUID profileId) { + return profiles.get(profileId); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java index b108286afd..54200be434 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisSecurityStore.java @@ -46,7 +46,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { lock = redisLock.obtain(toLockKey(endpoint)); lock.lock(); byte[] data = connection.get((SEC_EP + endpoint).getBytes()); - if (data == null) { + if (data == null || data.length == 0) { return null; } else { return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); @@ -69,7 +69,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { return null; } else { byte[] data = connection.get((SEC_EP + new String(ep)).getBytes()); - if (data == null) { + if (data == null || data.length == 0) { return null; } else { return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); @@ -122,7 +122,11 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { lock = redisLock.obtain(endpoint); lock.lock(); byte[] data = connection.get((SEC_EP + endpoint).getBytes()); - return (TbLwM2MSecurityInfo) serializer.asObject(data); + if (data != null && data.length > 0) { + return (TbLwM2MSecurityInfo) serializer.asObject(data); + } else { + return null; + } } finally { if (lock != null) { lock.unlock(); @@ -137,7 +141,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { lock = redisLock.obtain(endpoint); lock.lock(); byte[] data = connection.get((SEC_EP + endpoint).getBytes()); - if (data != null) { + if (data != null && data.length > 0) { SecurityInfo info = ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); if (info != null && info.getIdentity() != null) { connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes()); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 2908d7813d..5f8ccad17e 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -321,20 +321,28 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl */ @Override public void onDeviceProfileUpdate(SessionInfoProto sessionInfo, DeviceProfile deviceProfile) { - List clients = clientContext.getLwM2mClients() - .stream().filter(e -> e.getProfileId().equals(deviceProfile.getUuidId())).collect(Collectors.toList()); - clients.forEach(client -> client.onDeviceProfileUpdate(deviceProfile)); - if (clients.size() > 0) { - this.onDeviceProfileUpdate(clients, deviceProfile); + try { + List clients = clientContext.getLwM2mClients() + .stream().filter(e -> e.getProfileId() != null) + .filter(e -> e.getProfileId().equals(deviceProfile.getUuidId())).collect(Collectors.toList()); + clients.forEach(client -> client.onDeviceProfileUpdate(deviceProfile)); + if (clients.size() > 0) { + this.onDeviceProfileUpdate(clients, deviceProfile); + } + } catch (Exception e) { + log.warn("[{}] failed to update profile: {}", deviceProfile.getId(), deviceProfile); } } @Override public void onDeviceUpdate(SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { - //TODO: check, maybe device has multiple sessions/registrations? Is this possible according to the standard. - LwM2mClient client = clientContext.getClientByDeviceId(device.getUuidId()); - if (client != null) { - this.onDeviceUpdate(client, device, deviceProfileOpt); + try { + LwM2mClient client = clientContext.getClientByDeviceId(device.getUuidId()); + if (client != null) { + this.onDeviceUpdate(client, device, deviceProfileOpt); + } + } catch (Exception e) { + log.warn("[{}] failed to update device: {}", device.getId(), device); } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 5330021747..dba34957b9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -210,7 +210,6 @@ public class DefaultTransportService implements TransportService { } records.forEach(record -> { try { - log.info("[{}] SessionIdMSB, [{}] SessionIdLSB, records", record.getValue().getSessionIdMSB(), record.getValue().getSessionIdLSB()); processToTransportMsg(record.getValue()); } catch (Throwable e) { log.warn("Failed to process the notification.", e); @@ -771,6 +770,7 @@ public class DefaultTransportService implements TransportService { UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); SessionMetaData md = sessions.get(sessionId); if (md != null) { + log.trace("[{}] Processing notification: {}", sessionId, toSessionMsg); SessionMsgListener listener = md.getListener(); transportCallbackExecutor.submit(() -> { if (toSessionMsg.hasGetAttributesResponse()) { @@ -798,12 +798,14 @@ public class DefaultTransportService implements TransportService { deregisterSession(md.getSessionInfo()); } } else { + log.trace("Processing broadcast notification: {}", toSessionMsg); if (toSessionMsg.hasEntityUpdateMsg()) { TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); EntityType entityType = EntityType.valueOf(msg.getEntityType()); if (EntityType.DEVICE_PROFILE.equals(entityType)) { DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); if (deviceProfile != null) { + log.info("On device profile update: {}", deviceProfile); onProfileUpdate(deviceProfile); } } else if (EntityType.TENANT_PROFILE.equals(entityType)) {