Lwm2m: backEnd: add ExecutorService to LwM2MTransportService

This commit is contained in:
nickAS21 2020-12-25 21:35:13 +02:00
parent 5eeb12ea13
commit 9df5be790a
6 changed files with 126 additions and 122 deletions

View File

@ -107,6 +107,8 @@ public class LwM2MTransportHandler{
public static final String PUT_TYPE_OPER_WRITE_ATTRIBUTES = "wright-attributes";
public static final String EVENT_AWAKE = "AWAKE";
public static final String SERVICE_CHANNEL = "SERVICE";
public static final String RESPONSE_CHANNEL = "RESP";
@Autowired
@Qualifier("LeshanServerCert")

View File

@ -74,26 +74,27 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandle
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.POST_TYPE_OPER_WRITE_REPLACE;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.PUT_TYPE_OPER_WRITE_ATTRIBUTES;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.PUT_TYPE_OPER_WRITE_UPDATE;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.RESPONSE_CHANNEL;
@Slf4j
@Service("LwM2MTransportRequest")
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2MTransportRequest {
private final ExecutorService executorService;
private static final String RESPONSE_CHANNEL = "THINGSBOARD_RESP";
private final LwM2mValueConverterImpl converter;
private ExecutorService executorResponse;
private ExecutorService executorResponseError;
private LwM2mValueConverterImpl converter;
@Autowired
LwM2MTransportService service;
public LwM2MTransportRequest() {
this.converter = new LwM2mValueConverterImpl();
executorService = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel response", RESPONSE_CHANNEL)));
}
@PostConstruct
public void init() {
this.converter = new LwM2mValueConverterImpl();
executorResponse = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel response", RESPONSE_CHANNEL)));
executorResponseError = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel response Error", RESPONSE_CHANNEL)));
}
public Collection<Registration> doGetRegistrations(LeshanServer lwServer) {
@ -306,27 +307,21 @@ public class LwM2MTransportRequest {
}
private void handleResponse(Registration registration, final String path, LwM2mResponse response, DownlinkRequest request, LwM2MClient lwM2MClient, boolean isDelayedUpdate) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
sendResponse(registration, path, response, request, lwM2MClient, isDelayedUpdate);
} catch (RuntimeException t) {
log.error("[{}] endpoint [{}] path [{}] error Unable to after send response.", registration.getEndpoint(), path, t.toString());
}
executorResponse.submit(() -> {
try {
sendResponse(registration, path, response, request, lwM2MClient, isDelayedUpdate);
} catch (Exception e) {
log.error("[{}] endpoint [{}] path [{}] error Unable to after send response.", registration.getEndpoint(), path, e);
}
});
}
private void handleResponseError(Registration registration, final String path, LwM2MClient lwM2MClient, boolean isDelayedUpdate) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
if (isDelayedUpdate) lwM2MClient.onSuccessOrErrorDelayedRequests(path);
} catch (RuntimeException t) {
log.error("[{}] endpoint [{}] path [{}] error Unable to after send response.", registration.getEndpoint(), path, t.toString());
}
executorResponseError.submit(() -> {
try {
if (isDelayedUpdate) lwM2MClient.onSuccessOrErrorDelayedRequests(path);
} catch (RuntimeException t) {
log.error("[{}] endpoint [{}] path [{}] error Unable to after send response.", registration.getEndpoint(), path, t.toString());
}
});
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -32,6 +32,7 @@ import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.WriteRequest;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.server.californium.LeshanServer;
import org.eclipse.leshan.server.registration.Registration;
import org.springframework.beans.factory.annotation.Autowired;
@ -71,6 +72,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
@ -89,6 +92,7 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandle
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.LOG_LW2M_TELEMETRY;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.POST_TYPE_OPER_EXECUTE;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.POST_TYPE_OPER_WRITE_REPLACE;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.SERVICE_CHANNEL;
import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandler.getAckCallback;
@Slf4j
@ -96,6 +100,10 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2MTransportHandle
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true' ) || ('${service.type:null}'=='monolith' && '${transport.lwm2m.enabled}'=='true')")
public class LwM2MTransportService {
private ExecutorService executorRegistered;
private ExecutorService executorUpdateRegistered;
private ExecutorService executorUnRegistered;
@Autowired
private TransportService transportService;
@ -111,6 +119,12 @@ public class LwM2MTransportService {
@PostConstruct
public void init() {
context.getScheduler().scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) context.getCtxServer().getSessionReportTimeout()), context.getCtxServer().getSessionReportTimeout(), TimeUnit.MILLISECONDS);
executorRegistered = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel registered", SERVICE_CHANNEL)));
executorUpdateRegistered = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel update registered", SERVICE_CHANNEL)));
executorUnRegistered = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("LwM2M %s channel un registered", SERVICE_CHANNEL)));
}
/**
@ -129,28 +143,35 @@ public class LwM2MTransportService {
* @param previousObsersations - may be null
*/
public void onRegistered(LeshanServer lwServer, Registration registration, Collection<Observation> previousObsersations) {
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration);
if (lwM2MClient != null) {
lwM2MClient.setLwM2MTransportService(this);
lwM2MClient.setLwM2MTransportService(this);
lwM2MClient.setSessionUuid(UUID.randomUUID());
this.setLwM2MClient(lwServer, registration, lwM2MClient);
SessionInfoProto sessionInfo = this.getValidateSessionInfo(registration.getId());
if (sessionInfo != null) {
lwM2MClient.setDeviceUuid(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
lwM2MClient.setProfileUuid(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
lwM2MClient.setDeviceName(sessionInfo.getDeviceName());
lwM2MClient.setDeviceProfileName(sessionInfo.getDeviceType());
transportService.registerAsyncSession(sessionInfo, new LwM2MSessionMsgListener(this, sessionInfo));
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client registration", registration.getId());
} else {
log.error("Client: [{}] onRegistered [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
executorRegistered.submit(() -> {
try {
log.info("[{}] Client: onRegistered name ", registration.getEndpoint());
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration);
if (lwM2MClient != null) {
lwM2MClient.setLwM2MTransportService(this);
lwM2MClient.setLwM2MTransportService(this);
lwM2MClient.setSessionUuid(UUID.randomUUID());
this.setLwM2MClient(lwServer, registration, lwM2MClient);
SessionInfoProto sessionInfo = this.getValidateSessionInfo(registration.getId());
if (sessionInfo != null) {
lwM2MClient.setDeviceUuid(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
lwM2MClient.setProfileUuid(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
lwM2MClient.setDeviceName(sessionInfo.getDeviceName());
lwM2MClient.setDeviceProfileName(sessionInfo.getDeviceType());
transportService.registerAsyncSession(sessionInfo, new LwM2MSessionMsgListener(this, sessionInfo));
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client registration", registration.getId());
} else {
log.error("Client: [{}] onRegistered [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
}
} else {
log.error("Client: [{}] onRegistered [{}] name [{}] lwM2MClient ", registration.getId(), registration.getEndpoint(), null);
}
} catch (Throwable t) {
log.error("[{}] endpoint [{}] error Unable registration.", registration.getEndpoint(), t);
}
} else {
log.error("Client: [{}] onRegistered [{}] name [{}] lwM2MClient ", registration.getId(), registration.getEndpoint(), null);
}
});
}
/**
@ -158,12 +179,18 @@ public class LwM2MTransportService {
* @param registration - Registration LwM2M Client
*/
public void updatedReg(LeshanServer lwServer, Registration registration) {
SessionInfoProto sessionInfo = this.getValidateSessionInfo(registration.getId());
if (sessionInfo != null) {
log.info("Client: [{}] updatedReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
} else {
log.error("Client: [{}] updatedReg [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
}
executorUpdateRegistered.submit(() -> {
try {
SessionInfoProto sessionInfo = this.getValidateSessionInfo(registration.getId());
if (sessionInfo != null) {
log.info("Client: [{}] updatedReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
} else {
log.error("Client: [{}] updatedReg [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
}
} catch (Throwable t) {
log.error("[{}] endpoint [{}] error Unable update registration.", registration.getEndpoint(), t);
}
});
}
/**
@ -172,8 +199,14 @@ public class LwM2MTransportService {
* !!! Warn: if have not finishing unReg, then this operation will be finished on next Client`s connect
*/
public void unReg(Registration registration, Collection<Observation> observations) {
this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client unRegistration", registration.getId());
this.closeClientSession(registration);
executorUpdateRegistered.submit(() -> {
try {
this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client unRegistration", registration.getId());
this.closeClientSession(registration);
} catch (Throwable t) {
log.error("[{}] endpoint [{}] error Unable un registration.", registration.getEndpoint(), t);
}
});
}
private void closeClientSession(Registration registration) {
@ -848,14 +881,12 @@ public class LwM2MTransportService {
* @param deviceProfile -
*/
public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
String registrationId = lwM2mInMemorySecurityStore.getSessions().entrySet()
Set<String> registrationIds = lwM2mInMemorySecurityStore.getSessions().entrySet()
.stream()
.filter(e -> e.getValue().getDeviceUuid().equals(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())))
.findFirst()
.map(Map.Entry::getKey)
.orElse("");
if (!registrationId.isEmpty()) {
this.onDeviceUpdateChangeProfile(registrationId, deviceProfile);
.filter(e -> e.getValue().getProfileUuid().equals(deviceProfile.getUuidId()))
.map(Map.Entry::getKey).sorted().collect(Collectors.toSet());
if (registrationIds.size() > 0) {
this.onDeviceUpdateChangeProfile(registrationIds, deviceProfile);
}
}
@ -883,19 +914,23 @@ public class LwM2MTransportService {
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getSessions().get(registrationId);
lwM2MClient.setDeviceName(device.getName());
if (!lwM2MClient.getProfileUuid().equals(device.getDeviceProfileId().getId())) {
deviceProfileOpt.ifPresent(deviceProfile -> this.onDeviceUpdateChangeProfile(registrationId, deviceProfile));
Set<String> registrationIds = new HashSet<>();
registrationIds.add(registrationId);
deviceProfileOpt.ifPresent(deviceProfile -> this.onDeviceUpdateChangeProfile(registrationIds, deviceProfile));
}
lwM2MClient.setProfileUuid(device.getDeviceProfileId().getId());
}
/**
* #1 Read new, old Value (Attribute, Telemetry, Observe, KeyName)
* #2 Update in lwM2MClient: ...Profile
* #2 Update in lwM2MClient: ...Profile if changes from update device
* #3 Equivalence test: old <> new Value (Attribute, Telemetry, Observe, KeyName)
* #3.1 Attribute isChange (add&del)
* #3.2 Telemetry isChange (add&del)
* #3.3 KeyName isChange (add)
* #4 update
* #4.1 add If #3 isChange, then analyze and update Value in Transport form Client and sent Value ti thingsboard
* #4.1 add If #3 isChange, then analyze and update Value in Transport form Client and sent Value to thingsboard
* #4.2 del
* -- if add attributes includes del telemetry - result del for observe
* #5
@ -905,15 +940,14 @@ public class LwM2MTransportService {
* #5.3 Observe.del
* -- different between newObserve and oldObserve: sent Request cancel observe to client
*
* @param registrationId -
* @param deviceProfile -
* @param registrationIds -
* @param deviceProfile -
*/
public void onDeviceUpdateChangeProfile(String registrationId, DeviceProfile deviceProfile) {
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(registrationId);
AttrTelemetryObserveValue attrTelemetryObserveValueOld = lwM2mInMemorySecurityStore.getProfiles().get(lwM2MClient.getProfileUuid());
public void onDeviceUpdateChangeProfile(Set<String> registrationIds, DeviceProfile deviceProfile) {
AttrTelemetryObserveValue attrTelemetryObserveValueOld = lwM2mInMemorySecurityStore.getProfiles().get(deviceProfile.getUuidId());
if (lwM2mInMemorySecurityStore.addUpdateProfileParameters(deviceProfile)) {
LeshanServer lwServer = lwM2MClient.getLwServer();
Registration registration = lwM2mInMemorySecurityStore.getByRegistration(registrationId);
// #1
JsonArray attributeOld = attrTelemetryObserveValueOld.getPostAttributeProfile();
Set attributeSetOld = new Gson().fromJson(attributeOld, Set.class);
@ -929,9 +963,6 @@ public class LwM2MTransportService {
Set telemetrySetNew = new Gson().fromJson(telemetryNew, Set.class);
JsonArray observeNew = attrTelemetryObserveValueNew.getPostObserveProfile();
JsonObject keyNameNew = attrTelemetryObserveValueNew.getPostKeyNameProfile();
// #2
lwM2MClient.setDeviceProfileName(deviceProfile.getName());
lwM2MClient.setProfileUuid(deviceProfile.getUuidId());
// #3
ResultsAnalyzerParameters sentAttrToThingsboard = new ResultsAnalyzerParameters();
@ -957,9 +988,15 @@ public class LwM2MTransportService {
// #4.1 add
if (sentAttrToThingsboard.getPathPostParametersAdd().size() > 0) {
// update value in Resources
this.updateResourceValueObserve(lwServer, registration, lwM2MClient, sentAttrToThingsboard.getPathPostParametersAdd(), GET_TYPE_OPER_READ);
// sent attr/telemetry to tingsboard for new path
this.updateAttrTelemetry(registration, false, sentAttrToThingsboard.getPathPostParametersAdd());
registrationIds.forEach(registrationId -> {
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(registrationId);
LeshanServer lwServer = lwM2MClient.getLwServer();
Registration registration = lwM2mInMemorySecurityStore.getByRegistration(registrationId);
log.warn("[{}] # 4.1", registration.getEndpoint());
this.updateResourceValueObserve(lwServer, registration, lwM2MClient, sentAttrToThingsboard.getPathPostParametersAdd(), GET_TYPE_OPER_READ);
// sent attr/telemetry to tingsboard for new path
this.updateAttrTelemetry(registration, false, sentAttrToThingsboard.getPathPostParametersAdd());
});
}
// #4.2 del
if (sentAttrToThingsboard.getPathPostParametersDel().size() > 0) {
@ -980,10 +1017,16 @@ public class LwM2MTransportService {
// does not include oldObserve
ResultsAnalyzerParameters postObserveAnalyzer = this.getAnalyzerParameters(sentObserveToClientOld.getPathPostParametersAdd(), sentObserveToClientNew.getPathPostParametersAdd());
// sent Request observe to Client
this.updateResourceValueObserve(lwServer, registration, lwM2MClient, postObserveAnalyzer.getPathPostParametersAdd(), GET_TYPE_OPER_OBSERVE);
// 5.3 del
// sent Request cancel observe to Client
this.cancelObserveIsValue(lwServer, registration, postObserveAnalyzer.getPathPostParametersDel());
registrationIds.forEach(registrationId -> {
LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(registrationId);
LeshanServer lwServer = lwM2MClient.getLwServer();
Registration registration = lwM2mInMemorySecurityStore.getByRegistration(registrationId);
log.warn("[{}] # 5.1", registration.getEndpoint());
this.updateResourceValueObserve(lwServer, registration, lwM2MClient, postObserveAnalyzer.getPathPostParametersAdd(), GET_TYPE_OPER_OBSERVE);
// 5.3 del
// sent Request cancel observe to Client
this.cancelObserveIsValue(lwServer, registration, postObserveAnalyzer.getPathPostParametersDel());
});
}
}
}

View File

@ -1,38 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server;
import lombok.Builder;
import lombok.Data;
@Data
public class ResultIds {
@Builder.Default
int objectId = -1;
@Builder.Default
int instanceId = -1;
@Builder.Default
int resourceId = -1;
public ResultIds (String path) {
String[] paths = path.split("/");
if (paths != null && paths.length > 1) {
this.objectId = (paths.length > 1) ? Integer.parseInt(paths[1]) : this.objectId;
this.instanceId = (paths.length > 2) ? Integer.parseInt(paths[2]) : this.instanceId;
this.resourceId = (paths.length > 3) ? Integer.parseInt(paths[3]) : this.resourceId;
}
}
}

View File

@ -173,16 +173,16 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
private SecurityInfo add(String identity) {
ReadResultSecurityStore store = lwM2MGetSecurityInfo.getSecurityInfo(identity, TypeServer.CLIENT);
UUID profileUuid = (addUpdateProfileParameters(store.getDeviceProfile())) ? store.getDeviceProfile().getUuidId() : null;
UUID profileUuid = (store.getDeviceProfile() != null && addUpdateProfileParameters(store.getDeviceProfile())) ? store.getDeviceProfile().getUuidId() : null;
if (store.getSecurityInfo() != null) {
if (store.getSecurityMode() < DEFAULT_MODE.code) {
String endpoint = store.getSecurityInfo().getEndpoint();
sessions.put(endpoint, new LwM2MClient(endpoint, store.getSecurityInfo().getIdentity(), store.getSecurityInfo(), store.getMsg(), null, null, profileUuid));
}
} else {
if (store.getSecurityMode() == NO_SEC.code)
if (store.getSecurityMode() == NO_SEC.code && profileUuid != null)
sessions.put(identity, new LwM2MClient(identity, null, null, store.getMsg(), null, null, profileUuid));
else log.error("Registration failed: FORBIDDEN, endpointId: [{}]", identity);
else log.error("Registration failed: FORBIDDEN/profileUuid [{}] , endpointId: [{}]", profileUuid, identity);
}
return store.getSecurityInfo();
}

View File

@ -43,8 +43,10 @@ public abstract class TransportContext {
@Autowired
private TransportService transportService;
@Autowired
private TbServiceInfoProvider serviceInfoProvider;
@Autowired
private SchedulerComponent scheduler;