Lwm2m: backEnd: refactoring service

This commit is contained in:
nickAS21 2021-01-05 19:06:14 +02:00
parent bc4684c907
commit be4475c2cc
10 changed files with 113 additions and 127 deletions

View File

@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode; import org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode;
import org.thingsboard.server.transport.lwm2m.secure.LwM2mValidateCredentialsSecurityInfo; import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInfoValidator;
import org.thingsboard.server.transport.lwm2m.secure.ReadResultSecurityStore; import org.thingsboard.server.transport.lwm2m.secure.ReadResultSecurityStore;
import org.thingsboard.server.transport.lwm2m.server.LwM2MSessionMsgListener; import org.thingsboard.server.transport.lwm2m.server.LwM2MSessionMsgListener;
import org.thingsboard.server.transport.lwm2m.server.LwM2MTransportContextServer; import org.thingsboard.server.transport.lwm2m.server.LwM2MTransportContextServer;
@ -60,7 +60,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
private final EditableBootstrapConfigStore bootstrapConfigStore; private final EditableBootstrapConfigStore bootstrapConfigStore;
@Autowired @Autowired
LwM2mValidateCredentialsSecurityInfo lwM2MValidateCredentialsSecurityInfo; LwM2mCredentialsSecurityInfoValidator lwM2MCredentialsSecurityInfoValidator;
@Autowired @Autowired
public LwM2MTransportContextServer context; public LwM2MTransportContextServer context;
@ -72,7 +72,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
@Override @Override
public List<SecurityInfo> getAllByEndpoint(String endPoint) { public List<SecurityInfo> getAllByEndpoint(String endPoint) {
String endPointKey = endPoint; String endPointKey = endPoint;
ReadResultSecurityStore store = lwM2MValidateCredentialsSecurityInfo.validateCredentialsSecurityInfo(endPointKey, TypeServer.BOOTSTRAP); ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(endPointKey, TypeServer.BOOTSTRAP);
if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) { if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) {
/** add value to store from BootstrapJson */ /** add value to store from BootstrapJson */
this.setBootstrapConfigScurityInfo(store); this.setBootstrapConfigScurityInfo(store);
@ -96,7 +96,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
@Override @Override
public SecurityInfo getByIdentity(String identity) { public SecurityInfo getByIdentity(String identity) {
ReadResultSecurityStore store = lwM2MValidateCredentialsSecurityInfo.validateCredentialsSecurityInfo(identity, TypeServer.BOOTSTRAP); ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(identity, TypeServer.BOOTSTRAP);
if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) { if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) {
/** add value to store from BootstrapJson */ /** add value to store from BootstrapJson */
this.setBootstrapConfigScurityInfo(store); this.setBootstrapConfigScurityInfo(store);

View File

@ -47,7 +47,7 @@ import static org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode.X5
@Slf4j @Slf4j
@Component("LwM2MGetSecurityInfo") @Component("LwM2MGetSecurityInfo")
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')") @ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2mValidateCredentialsSecurityInfo { public class LwM2mCredentialsSecurityInfoValidator {
@Autowired @Autowired
public LwM2MTransportContextServer contextS; public LwM2MTransportContextServer contextS;
@ -62,7 +62,7 @@ public class LwM2mValidateCredentialsSecurityInfo {
* @param keyValue - * @param keyValue -
* @return ValidateDeviceCredentialsResponseMsg and SecurityInfo * @return ValidateDeviceCredentialsResponseMsg and SecurityInfo
*/ */
public ReadResultSecurityStore validateCredentialsSecurityInfo(String endPoint, TypeServer keyValue) { public ReadResultSecurityStore createAndValidateCredentialsSecurityInfo(String endPoint, TypeServer keyValue) {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
final ReadResultSecurityStore[] resultSecurityStore = new ReadResultSecurityStore[1]; final ReadResultSecurityStore[] resultSecurityStore = new ReadResultSecurityStore[1];
contextS.getTransportService().process(ValidateDeviceLwM2MCredentialsRequestMsg.newBuilder().setCredentialsId(endPoint).build(), contextS.getTransportService().process(ValidateDeviceLwM2MCredentialsRequestMsg.newBuilder().setCredentialsId(endPoint).build(),
@ -79,7 +79,7 @@ public class LwM2mValidateCredentialsSecurityInfo {
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] [{}] Failed to process credentials PSK ", endPoint, e.toString()); log.trace("[{}] [{}] Failed to process credentials ", endPoint, e);
resultSecurityStore[0] = createSecurityInfo(endPoint, null, null); resultSecurityStore[0] = createSecurityInfo(endPoint, null, null);
latch.countDown(); latch.countDown();
} }
@ -87,7 +87,7 @@ public class LwM2mValidateCredentialsSecurityInfo {
try { try {
latch.await(contextS.getCtxServer().getTimeout(), TimeUnit.MILLISECONDS); latch.await(contextS.getCtxServer().getTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("", e); log.error("Failed to await credentials!", e);
} }
return resultSecurityStore[0]; return resultSecurityStore[0];
} }

View File

@ -88,13 +88,12 @@ public class LwM2MTransportRequest {
@Autowired @Autowired
LwM2MTransportServiceImpl service; LwM2MTransportServiceImpl service;
@PostConstruct @PostConstruct
public void init() { public void init() {
this.converter = LwM2mValueConverterImpl.getInstance(); this.converter = LwM2mValueConverterImpl.getInstance();
executorResponse = Executors.newCachedThreadPool( executorResponse = Executors.newFixedThreadPool(10,
new NamedThreadFactory(String.format("LwM2M %s channel response", RESPONSE_CHANNEL))); new NamedThreadFactory(String.format("LwM2M %s channel response", RESPONSE_CHANNEL)));
executorResponseError = Executors.newCachedThreadPool( executorResponseError = Executors.newFixedThreadPool(10,
new NamedThreadFactory(String.format("LwM2M %s channel response Error", RESPONSE_CHANNEL))); new NamedThreadFactory(String.format("LwM2M %s channel response Error", RESPONSE_CHANNEL)));
} }
@ -220,7 +219,7 @@ public class LwM2MTransportRequest {
if (request != null) { if (request != null) {
this.sendRequest(lwServer, registration, request, lwM2MClient, timeoutInMs, isDelayedUpdate); this.sendRequest(lwServer, registration, request, lwM2MClient, timeoutInMs, isDelayedUpdate);
} else if (request == null && isDelayedUpdate) { } else if (isDelayedUpdate) {
String msg = String.format(LOG_LW2M_ERROR + ": sendRequest: Resource path - %s msg No SendRequest to Client", target); String msg = String.format(LOG_LW2M_ERROR + ": sendRequest: Resource path - %s msg No SendRequest to Client", target);
service.sentLogsToThingsboard(msg, registration); service.sentLogsToThingsboard(msg, registration);
log.error("[{}] - [{}] No SendRequest", target); log.error("[{}] - [{}] No SendRequest", target);

View File

@ -82,16 +82,6 @@ public class LwM2MTransportServerConfiguration {
@Autowired @Autowired
private LwM2mInMemorySecurityStore lwM2mInMemorySecurityStore; private LwM2mInMemorySecurityStore lwM2mInMemorySecurityStore;
@Bean
public LwM2mServerListener lwM2mServerListenerCert() {
return new LwM2mServerListener();
}
@Bean
public LwM2mServerListener lwM2mServerListenerNoSecPskRpk() {
return new LwM2mServerListener();
}
@Primary @Primary
@Bean(name = "LeshanServerCert") @Bean(name = "LeshanServerCert")
public LeshanServer getLeshanServerCert() { public LeshanServer getLeshanServerCert() {

View File

@ -32,6 +32,10 @@ import javax.annotation.PreDestroy;
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')") @ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2MTransportServerInitializer { public class LwM2MTransportServerInitializer {
@Autowired
private LwM2MTransportServiceImpl service;
@Autowired @Autowired
@Qualifier("LeshanServerCert") @Qualifier("LeshanServerCert")
private LeshanServer lhServerCert; private LeshanServer lhServerCert;
@ -39,16 +43,6 @@ public class LwM2MTransportServerInitializer {
@Autowired @Autowired
@Qualifier("LeshanServerNoSecPskRpk") @Qualifier("LeshanServerNoSecPskRpk")
private LeshanServer lhServerNoSecPskRpk; private LeshanServer lhServerNoSecPskRpk;
//
// @Autowired
// @Qualifier("LeshanServerListener")
// private LwM2mServerListener lwM2mServerListener;
@Autowired
private LwM2mServerListener lwM2mServerListenerNoSecPskRpk;
@Autowired
private LwM2mServerListener lwM2mServerListenerCert;
@Autowired @Autowired
private LwM2MTransportContextServer context; private LwM2MTransportContextServer context;
@ -70,28 +64,25 @@ public class LwM2MTransportServerInitializer {
private void startLhServerCert() { private void startLhServerCert() {
this.lhServerCert.start(); this.lhServerCert.start();
LwM2mServerListener serverListenerCert = this.lwM2mServerListenerCert.init(this.lhServerCert); LwM2mServerListener lhServerCertListener = new LwM2mServerListener(this.lhServerCert, service);
this.lhServerCert.getRegistrationService().addListener(serverListenerCert.registrationListener); this.lhServerCert.getRegistrationService().addListener(lhServerCertListener.registrationListener);
this.lhServerCert.getPresenceService().addListener(serverListenerCert.presenceListener); this.lhServerCert.getPresenceService().addListener(lhServerCertListener.presenceListener);
this.lhServerCert.getObservationService().addListener(serverListenerCert.observationListener); this.lhServerCert.getObservationService().addListener(lhServerCertListener.observationListener);
} }
private void startLhServerNoSecPskRpk() { private void startLhServerNoSecPskRpk() {
this.lhServerNoSecPskRpk.start(); this.lhServerNoSecPskRpk.start();
LwM2mServerListener serverListenerNoSecPskRpk = this.lwM2mServerListenerNoSecPskRpk.init(this.lhServerNoSecPskRpk); LwM2mServerListener lhServerNoSecPskRpkListener = new LwM2mServerListener(this.lhServerNoSecPskRpk, service);
this.lhServerNoSecPskRpk.getRegistrationService().addListener(serverListenerNoSecPskRpk.registrationListener); this.lhServerNoSecPskRpk.getRegistrationService().addListener(lhServerNoSecPskRpkListener.registrationListener);
this.lhServerNoSecPskRpk.getPresenceService().addListener(serverListenerNoSecPskRpk.presenceListener); this.lhServerNoSecPskRpk.getPresenceService().addListener(lhServerNoSecPskRpkListener.presenceListener);
this.lhServerNoSecPskRpk.getObservationService().addListener(serverListenerNoSecPskRpk.observationListener); this.lhServerNoSecPskRpk.getObservationService().addListener(lhServerNoSecPskRpkListener.observationListener);
} }
@PreDestroy @PreDestroy
public void shutdown() { public void shutdown() {
log.info("Stopping LwM2M transport Server!"); log.info("Stopping LwM2M transport Server!");
try {
lhServerCert.destroy(); lhServerCert.destroy();
lhServerNoSecPskRpk.destroy(); lhServerNoSecPskRpk.destroy();
} finally {
}
log.info("LwM2M transport Server stopped!"); log.info("LwM2M transport Server stopped!");
} }
} }

View File

@ -20,6 +20,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.Link;
import org.eclipse.leshan.core.model.ResourceModel; import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mMultipleResource; import org.eclipse.leshan.core.node.LwM2mMultipleResource;
import org.eclipse.leshan.core.node.LwM2mObject; import org.eclipse.leshan.core.node.LwM2mObject;
@ -56,7 +57,6 @@ import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -68,7 +68,6 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -80,7 +79,6 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.common.transport.util.JsonUtils.getJsonObject; import static org.thingsboard.server.common.transport.util.JsonUtils.getJsonObject;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.CLIENT_NOT_AUTHORIZED; import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.CLIENT_NOT_AUTHORIZED;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEFAULT_TIMEOUT;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_ATTRIBUTES_REQUEST; import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_ATTRIBUTES_TOPIC; import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_TELEMETRY_TOPIC; import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.DEVICE_TELEMETRY_TOPIC;
@ -122,11 +120,11 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
@PostConstruct @PostConstruct
public void init() { public void init() {
this.context.getScheduler().scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) context.getCtxServer().getSessionReportTimeout()), context.getCtxServer().getSessionReportTimeout(), TimeUnit.MILLISECONDS); this.context.getScheduler().scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) context.getCtxServer().getSessionReportTimeout()), context.getCtxServer().getSessionReportTimeout(), TimeUnit.MILLISECONDS);
this.executorRegistered = Executors.newCachedThreadPool( this.executorRegistered = Executors.newFixedThreadPool(10,
new NamedThreadFactory(String.format("LwM2M %s channel registered", SERVICE_CHANNEL))); new NamedThreadFactory(String.format("LwM2M %s channel registered", SERVICE_CHANNEL)));
this.executorUpdateRegistered = Executors.newCachedThreadPool( this.executorUpdateRegistered = Executors.newFixedThreadPool(10,
new NamedThreadFactory(String.format("LwM2M %s channel update registered", SERVICE_CHANNEL))); new NamedThreadFactory(String.format("LwM2M %s channel update registered", SERVICE_CHANNEL)));
this.executorUnRegistered = Executors.newCachedThreadPool( this.executorUnRegistered = Executors.newFixedThreadPool(10,
new NamedThreadFactory(String.format("LwM2M %s channel un registered", SERVICE_CHANNEL))); new NamedThreadFactory(String.format("LwM2M %s channel un registered", SERVICE_CHANNEL)));
this.converter = LwM2mValueConverterImpl.getInstance(); this.converter = LwM2mValueConverterImpl.getInstance();
} }
@ -149,7 +147,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
public void onRegistered(LeshanServer lwServer, Registration registration, Collection<Observation> previousObsersations) { public void onRegistered(LeshanServer lwServer, Registration registration, Collection<Observation> previousObsersations) {
executorRegistered.submit(() -> { executorRegistered.submit(() -> {
try { try {
log.info("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId()); log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId());
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration); LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration);
if (lwM2MClient != null) { if (lwM2MClient != null) {
lwM2MClient.setLwM2MTransportServiceImpl(this); lwM2MClient.setLwM2MTransportServiceImpl(this);
@ -180,6 +178,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
/** /**
* if sessionInfo removed from sessions, then new registerAsyncSession * if sessionInfo removed from sessions, then new registerAsyncSession
*
* @param lwServer - LeshanServer * @param lwServer - LeshanServer
* @param registration - Registration LwM2M Client * @param registration - Registration LwM2M Client
*/ */
@ -280,20 +279,38 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
* @param lwM2MClient - object with All parameters off client * @param lwM2MClient - object with All parameters off client
*/ */
private void setLwM2MClient(LeshanServer lwServer, Registration registration, LwM2MClient lwM2MClient) { private void setLwM2MClient(LeshanServer lwServer, Registration registration, LwM2MClient lwM2MClient) {
Arrays.stream(registration.getObjectLinks()).forEach(url -> { // #1
for (Link url : registration.getObjectLinks()) {
LwM2mPath pathIds = new LwM2mPath(url.getUrl()); LwM2mPath pathIds = new LwM2mPath(url.getUrl());
if (pathIds.isObjectInstance() && !pathIds.isResource()) { if (pathIds.isObjectInstance() && !pathIds.isResource()) {
lwM2MClient.getPendingRequests().add(url.getUrl()); lwM2MClient.getPendingRequests().add(url.getUrl());
} }
}); }
// #2 // #2
Arrays.stream(registration.getObjectLinks()).forEach(url -> { for (Link url : registration.getObjectLinks()) {
LwM2mPath pathIds = new LwM2mPath(url.getUrl()); LwM2mPath pathIds = new LwM2mPath(url.getUrl());
if (pathIds.isObjectInstance() && !pathIds.isResource()) { if (pathIds.isObjectInstance() && !pathIds.isResource()) {
lwM2MTransportRequest.sendAllRequest(lwServer, registration, url.getUrl(), GET_TYPE_OPER_READ, ContentFormat.TLV.getName(), lwM2MTransportRequest.sendAllRequest(lwServer, registration, url.getUrl(), GET_TYPE_OPER_READ, ContentFormat.TLV.getName(),
lwM2MClient, null, null, this.context.getCtxServer().getTimeout(), false); lwM2MClient, null, null, this.context.getCtxServer().getTimeout(), false);
} }
}); }
// #1
// Arrays.stream(registration.getObjectLinks()).forEach(url -> {
// LwM2mPath pathIds = new LwM2mPath(url.getUrl());
// if (pathIds.isObjectInstance() && !pathIds.isResource()) {
// lwM2MClient.getPendingRequests().add(url.getUrl());
// }
// });
// #2
// Arrays.stream(registration.getObjectLinks()).forEach(url -> {
// LwM2mPath pathIds = new LwM2mPath(url.getUrl());
// if (pathIds.isObjectInstance() && !pathIds.isResource()) {
// lwM2MTransportRequest.sendAllRequest(lwServer, registration, url.getUrl(), GET_TYPE_OPER_READ, ContentFormat.TLV.getName(),
// lwM2MClient, null, null, this.context.getCtxServer().getTimeout(), false);
// }
// });
} }
/** /**
@ -307,7 +324,6 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
} }
/** /**
*
* @param registrationId - * @param registrationId -
* @return - * @return -
*/ */
@ -437,6 +453,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
/** /**
* Get names and keyNames from profile shared!!!! attr resources IsWritable * Get names and keyNames from profile shared!!!! attr resources IsWritable
*
* @param lwM2MClient - * @param lwM2MClient -
* @return ArrayList keyNames from profile attr resources shared!!!! && IsWritable * @return ArrayList keyNames from profile attr resources shared!!!! && IsWritable
*/ */
@ -479,13 +496,13 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
} }
} }
// #1.2 // #1.2
CountDownLatch cancelLatch = new CountDownLatch(1);
this.getParametersFromProfile(attributes, telemetries, registration, paths);
cancelLatch.countDown();
try { try {
cancelLatch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); writeLock.lock();
} catch (InterruptedException e) { this.getParametersFromProfile(attributes, telemetries, registration, paths);
log.error("[{}] updateAttrTelemetry", e.toString()); } catch (Exception e) {
log.error("UpdateAttrTelemetry", e);
} finally {
writeLock.unlock();
} }
if (attributes.getAsJsonObject().entrySet().size() > 0) if (attributes.getAsJsonObject().entrySet().size() > 0)
this.updateParametersOnThingsboard(attributes, DEVICE_ATTRIBUTES_TOPIC, registration); this.updateParametersOnThingsboard(attributes, DEVICE_ATTRIBUTES_TOPIC, registration);
@ -618,14 +635,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
* {@code ObservationService#cancelObservation()} * {@code ObservationService#cancelObservation()}
*/ */
public void setCancelObservationRecourse(LeshanServer lwServer, Registration registration, String path) { public void setCancelObservationRecourse(LeshanServer lwServer, Registration registration, String path) {
CountDownLatch cancelLatch = new CountDownLatch(1);
lwServer.getObservationService().cancelObservations(registration, path); lwServer.getObservationService().cancelObservations(registration, path);
cancelLatch.countDown();
try {
cancelLatch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("", e);
}
} }
/** /**
@ -685,7 +695,6 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
boolean isChange = false; boolean isChange = false;
try { try {
writeLock.lock(); writeLock.lock();
try {
// #1 // #1
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClientWithReg(registration, null); LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClientWithReg(registration, null);
LwM2mPath pathIds = new LwM2mPath(path); LwM2mPath pathIds = new LwM2mPath(path);
@ -702,13 +711,12 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
lwM2MClient.getResources().put(path, resourceValue); lwM2MClient.getResources().put(path, resourceValue);
isChange = true; isChange = true;
} }
} catch (Exception e) {
log.error("#1_1 Update ResourcesValue after Observation is unsuccessfully path: [{}] value: [{}] [{}]", path, value, e.toString());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
}
catch (Exception e) {
log.error("#1_1 Update ResourcesValue after Observation in CountDownLatch is unsuccessfully path: [{}] value: [{}] [{}]", path, value, e.toString());
}
if (isChange) { if (isChange) {
Set<String> paths = new HashSet<>(); Set<String> paths = new HashSet<>();
paths.add(path); paths.add(path);
@ -1103,6 +1111,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
/** /**
* if sessionInfo removed from sessions, then new registerAsyncSession * if sessionInfo removed from sessions, then new registerAsyncSession
*
* @param sessionInfo - * @param sessionInfo -
*/ */
private void checkInactivity(SessionInfoProto sessionInfo) { private void checkInactivity(SessionInfoProto sessionInfo) {

View File

@ -24,25 +24,18 @@ import org.eclipse.leshan.server.queue.PresenceListener;
import org.eclipse.leshan.server.registration.Registration; import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationListener; import org.eclipse.leshan.server.registration.RegistrationListener;
import org.eclipse.leshan.server.registration.RegistrationUpdate; import org.eclipse.leshan.server.registration.RegistrationUpdate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.Collection; import java.util.Collection;
@Slf4j @Slf4j
@Component()
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' )|| ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2mServerListener { public class LwM2mServerListener {
private LeshanServer lhServer; private final LeshanServer lhServer;
private final LwM2MTransportServiceImpl service;
@Autowired public LwM2mServerListener(LeshanServer lhServer, LwM2MTransportServiceImpl service) {
private LwM2MTransportServiceImpl service;
public LwM2mServerListener init(LeshanServer lhServer) {
this.lhServer = lhServer; this.lhServer = lhServer;
return this; this.service = service;
} }
public final RegistrationListener registrationListener = new RegistrationListener() { public final RegistrationListener registrationListener = new RegistrationListener() {
@ -51,8 +44,8 @@ public class LwM2mServerListener {
*/ */
@Override @Override
public void registered(Registration registration, Registration previousReg, public void registered(Registration registration, Registration previousReg,
Collection<Observation> previousObsersations) { Collection<Observation> previousObservations) {
service.onRegistered(lhServer, registration, previousObsersations); service.onRegistered(lhServer, registration, previousObservations);
} }
/** /**
@ -120,4 +113,5 @@ public class LwM2mServerListener {
log.info("Received newObservation from [{}] endpoint [{}] ", observation.getPath(), registration.getEndpoint()); log.info("Received newObservation from [{}] endpoint [{}] ", observation.getPath(), registration.getEndpoint());
} }
}; };
} }

View File

@ -63,12 +63,12 @@ public class LwM2MClient implements Cloneable {
return super.clone(); return super.clone();
} }
public LwM2MClient(String endPoint, String identity, SecurityInfo info, ValidateDeviceCredentialsResponseMsg credentialsResponse, Map<String, String> attributes, UUID profileUuid) { public LwM2MClient(String endPoint, String identity, SecurityInfo info, ValidateDeviceCredentialsResponseMsg credentialsResponse, UUID profileUuid) {
this.endPoint = endPoint; this.endPoint = endPoint;
this.identity = identity; this.identity = identity;
this.info = info; this.info = info;
this.credentialsResponse = credentialsResponse; this.credentialsResponse = credentialsResponse;
this.attributes = (attributes != null && attributes.size() > 0) ? attributes : new ConcurrentHashMap<String, String>(); this.attributes = new ConcurrentHashMap<>();
this.pendingRequests = ConcurrentHashMap.newKeySet(); this.pendingRequests = ConcurrentHashMap.newKeySet();
this.delayedRequests = new ConcurrentHashMap<>(); this.delayedRequests = new ConcurrentHashMap<>();
this.resources = new ConcurrentHashMap<>(); this.resources = new ConcurrentHashMap<>();

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.lwm2m.server.client; package org.thingsboard.server.transport.lwm2m.server.client;
import lombok.Data; import lombok.Data;
import java.util.Map; import java.util.Map;
@Data @Data
@ -29,6 +30,7 @@ public class ResourceValue {
this.value = value; this.value = value;
this.multiInstances = multiInstances; this.multiInstances = multiInstances;
} }
public Object getResourceValue() { public Object getResourceValue() {
return this.multiInstances ? this.values : this.value; return this.multiInstances ? this.values : this.value;
} }

View File

@ -29,7 +29,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode; import org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode;
import org.thingsboard.server.transport.lwm2m.secure.LwM2mValidateCredentialsSecurityInfo; import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInfoValidator;
import org.thingsboard.server.transport.lwm2m.secure.ReadResultSecurityStore; import org.thingsboard.server.transport.lwm2m.secure.ReadResultSecurityStore;
import org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler; import org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler;
import org.thingsboard.server.transport.lwm2m.server.client.AttrTelemetryObserveValue; import org.thingsboard.server.transport.lwm2m.server.client.AttrTelemetryObserveValue;
@ -53,17 +53,18 @@ import static org.thingsboard.server.transport.lwm2m.secure.LwM2MSecurityMode.NO
@Service("LwM2mInMemorySecurityStore") @Service("LwM2mInMemorySecurityStore")
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' )|| ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')") @ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' )|| ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2mInMemorySecurityStore extends InMemorySecurityStore { public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
private static final boolean INFOS_ARE_COMPROMISED = false;
// lock for the two maps // lock for the two maps
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock(); private final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock(); private final Lock writeLock = readWriteLock.writeLock();
private final boolean infosAreCompromised = false; private final Map<String /** registrationId */, LwM2MClient> sessions = new ConcurrentHashMap<>();
protected Map<String /** registrationId */, LwM2MClient> sessions = new ConcurrentHashMap<>(); private Map<UUID /** profileUUid */, AttrTelemetryObserveValue> profiles = new ConcurrentHashMap<>();
protected Map<UUID /** profileUUid */, AttrTelemetryObserveValue> profiles = new ConcurrentHashMap<>();
private SecurityStoreListener listener; private SecurityStoreListener listener;
@Autowired @Autowired
LwM2mValidateCredentialsSecurityInfo lwM2MValidateCredentialsSecurityInfo; LwM2mCredentialsSecurityInfoValidator lwM2MCredentialsSecurityInfoValidator;
/** /**
* Start after DefaultAuthorizer or LwM2mPskStore * Start after DefaultAuthorizer or LwM2mPskStore
@ -75,8 +76,8 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
readLock.lock(); readLock.lock();
try { try {
String registrationId = this.getByRegistrationId(endPoint, null); String registrationId = this.getByRegistrationId(endPoint, null);
SecurityInfo info = (registrationId != null && sessions.size() > 0 && sessions.get(registrationId) != null) ? sessions.get(registrationId).getInfo() : this.addLwM2MClientToSession(endPoint); return (registrationId != null && sessions.size() > 0 && sessions.get(registrationId) != null) ?
return info; sessions.get(registrationId).getInfo() : this.addLwM2MClientToSession(endPoint);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -102,7 +103,7 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
public Collection<SecurityInfo> getAll() { public Collection<SecurityInfo> getAll() {
readLock.lock(); readLock.lock();
try { try {
return Collections.unmodifiableCollection(this.sessions.entrySet().stream().map(model -> model.getValue().getInfo()).collect(Collectors.toList())); return Collections.unmodifiableCollection(this.sessions.values().stream().map(LwM2MClient::getInfo).collect(Collectors.toList()));
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -118,7 +119,7 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
LwM2MClient lwM2MClient = (sessions.get(registrationId) != null) ? sessions.get(registrationId) : null; LwM2MClient lwM2MClient = (sessions.get(registrationId) != null) ? sessions.get(registrationId) : null;
if (lwM2MClient != null) { if (lwM2MClient != null) {
if (listener != null) { if (listener != null) {
listener.securityInfoRemoved(infosAreCompromised, lwM2MClient.getInfo()); listener.securityInfoRemoved(INFOS_ARE_COMPROMISED, lwM2MClient.getInfo());
} }
sessions.remove(registrationId); sessions.remove(registrationId);
} }
@ -198,14 +199,14 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
* - device - if the thingsboard does not have a device with a name equal to the identity * - device - if the thingsboard does not have a device with a name equal to the identity
*/ */
private SecurityInfo addLwM2MClientToSession(String identity) { private SecurityInfo addLwM2MClientToSession(String identity) {
ReadResultSecurityStore store = lwM2MValidateCredentialsSecurityInfo.validateCredentialsSecurityInfo(identity, TypeServer.CLIENT); ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(identity, TypeServer.CLIENT);
if (store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) { if (store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) {
UUID profileUuid = (store.getDeviceProfile() != null && addUpdateProfileParameters(store.getDeviceProfile())) ? store.getDeviceProfile().getUuidId() : null; UUID profileUuid = (store.getDeviceProfile() != null && addUpdateProfileParameters(store.getDeviceProfile())) ? store.getDeviceProfile().getUuidId() : null;
if (store.getSecurityInfo() != null && profileUuid != null) { if (store.getSecurityInfo() != null && profileUuid != null) {
String endpoint = store.getSecurityInfo().getEndpoint(); String endpoint = store.getSecurityInfo().getEndpoint();
sessions.put(endpoint, new LwM2MClient(endpoint, store.getSecurityInfo().getIdentity(), store.getSecurityInfo(), store.getMsg(), null, profileUuid)); sessions.put(endpoint, new LwM2MClient(endpoint, store.getSecurityInfo().getIdentity(), store.getSecurityInfo(), store.getMsg(), profileUuid));
} else if (store.getSecurityMode() == NO_SEC.code && profileUuid != null) { } else if (store.getSecurityMode() == NO_SEC.code && profileUuid != null) {
sessions.put(identity, new LwM2MClient(identity, null, null, store.getMsg(), null, profileUuid)); sessions.put(identity, new LwM2MClient(identity, null, null, store.getMsg(), profileUuid));
} else { } else {
log.error("Registration failed: FORBIDDEN/profileUuid/device [{}] , endpointId: [{}]", profileUuid, identity); log.error("Registration failed: FORBIDDEN/profileUuid/device [{}] , endpointId: [{}]", profileUuid, identity);
/** /**