From 2c31de4a2c3ae2f53c45eecff5abcb848a56e596 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 15 Jun 2021 14:49:45 +0300 Subject: [PATCH] Fix duplication of sessions --- .../server/client/LwM2mClientContext.java | 4 +-- .../server/client/LwM2mClientContextImpl.java | 5 +++- .../TbLwM2MCancelAllObserveCallback.java | 8 ++++-- .../TbLwM2MCancelObserveCallback.java | 5 +++- .../downlink/TbLwM2MExecuteCallback.java | 12 ++------ .../downlink/TbLwM2MObserveCallback.java | 6 +--- .../server/downlink/TbLwM2MReadCallback.java | 4 +-- .../downlink/TbLwM2MTargetedCallback.java | 12 ++++---- .../TbLwM2MWriteAttributesCallback.java | 14 ++-------- .../TbLwM2MWriteResponseCallback.java | 14 ++++------ .../uplink/DefaultLwM2MUplinkMsgHandler.java | 28 +++++++++---------- 11 files changed, 46 insertions(+), 66 deletions(-) diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContext.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContext.java index f0539a0ccc..0f2f208b2b 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContext.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContext.java @@ -18,11 +18,11 @@ package org.thingsboard.server.transport.lwm2m.server.client; import org.eclipse.leshan.server.registration.Registration; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.Collection; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -34,7 +34,7 @@ public interface LwM2mClientContext { LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo); - void register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException; + Optional register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException; void updateRegistration(LwM2mClient client, Registration registration) throws LwM2MClientStateException; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java index c063a6f359..b9206ca4df 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java @@ -60,12 +60,14 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { } @Override - public void register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { + public Optional register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { + TransportProtos.SessionInfoProto oldSession = null; lwM2MClient.lock(); try { if (LwM2MClientState.UNREGISTERED.equals(lwM2MClient.getState())) { throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); } + oldSession = lwM2MClient.getSession(); TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint()); if (securityInfo.getSecurityMode() != null) { if (securityInfo.getDeviceProfile() != null) { @@ -89,6 +91,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { } finally { lwM2MClient.unlock(); } + return Optional.ofNullable(oldSession); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java index 42933689ae..42f2a1baec 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java @@ -15,12 +15,13 @@ */ package org.thingsboard.server.transport.lwm2m.server.downlink; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.OBSERVE_CANCEL_ALL; +@Slf4j public class TbLwM2MCancelAllObserveCallback extends AbstractTbLwM2MRequestCallback { public TbLwM2MCancelAllObserveCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client) { @@ -29,7 +30,8 @@ public class TbLwM2MCancelAllObserveCallback extends AbstractTbLwM2MRequestCallb @Override public void onSuccess(TbLwM2MCancelAllRequest request, Integer canceledSubscriptionsCount) { - String observeCancelMsg = String.format("%s: type operation %s paths: count: %d", LOG_LWM2M_INFO, OBSERVE_CANCEL_ALL.name(), canceledSubscriptionsCount); + log.trace("[{}] Cancel of all observations was successful: {}", client.getEndpoint(), canceledSubscriptionsCount); + handler.logToTelemetry(client, String.format("[%s]: Cancel of all observations was successful. Result: [%s]", LOG_LWM2M_INFO, canceledSubscriptionsCount)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java index 65179de20b..eec33963d5 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.transport.lwm2m.server.downlink; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.OBSERVE_CANCEL; +@Slf4j public class TbLwM2MCancelObserveCallback extends AbstractTbLwM2MRequestCallback { private final String versionedId; @@ -32,7 +34,8 @@ public class TbLwM2MCancelObserveCallback extends AbstractTbLwM2MRequestCallback @Override public void onSuccess(TbLwM2MCancelObserveRequest request, Integer canceledSubscriptionsCount) { - String observeCancelMsg = String.format("%s: type operation %s paths: %s count: %d", LOG_LWM2M_INFO, OBSERVE_CANCEL.name(), versionedId, canceledSubscriptionsCount); + log.trace("[{}] Cancel observation of [{}] successful: {}", client.getEndpoint(), versionedId, canceledSubscriptionsCount); + handler.logToTelemetry(client, String.format("[%s]: Cancel Observe for [%s] successful. Result: [%s]", LOG_LWM2M_INFO, versionedId, canceledSubscriptionsCount)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java index 5d01e4267e..d2aed6a4fd 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java @@ -20,18 +20,10 @@ import org.eclipse.leshan.core.response.ExecuteResponse; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; -public class TbLwM2MExecuteCallback extends AbstractTbLwM2MRequestCallback { - - private final String targetId; +public class TbLwM2MExecuteCallback extends TbLwM2MTargetedCallback { public TbLwM2MExecuteCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client); - this.targetId = targetId; - } - - @Override - public void onSuccess(ExecuteRequest request, ExecuteResponse response) { - //TODO: separate callback wrapper for the RPC calls. + super(handler, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java index 890a6e6097..e5453902fb 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java @@ -17,14 +17,10 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.request.ObserveRequest; -import org.eclipse.leshan.core.request.ReadRequest; import org.eclipse.leshan.core.response.ObserveResponse; -import org.eclipse.leshan.core.response.ReadResponse; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; - @Slf4j public class TbLwM2MObserveCallback extends TbLwM2MTargetedCallback { @@ -35,6 +31,6 @@ public class TbLwM2MObserveCallback extends TbLwM2MTargetedCallback { @@ -33,7 +31,7 @@ public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback extends AbstractTbLwM2MRequestCallback { - protected final String targetId; + protected final String versionedId; - public TbLwM2MTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { + public TbLwM2MTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String versionedId) { super(handler, client); - this.targetId = targetId; + this.versionedId = versionedId; } @Override public void onSuccess(R request, T response) { //TODO convert camelCase to "camel case" using .split("(? { - - private final String targetId; +public class TbLwM2MWriteAttributesCallback extends TbLwM2MTargetedCallback { public TbLwM2MWriteAttributesCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client); - this.targetId = targetId; - } - - @Override - public void onSuccess(WriteAttributesRequest request, WriteAttributesResponse response) { - //TODO: separate callback wrapper for the RPC calls. + super(handler, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java index b383239777..89586bd113 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java @@ -15,25 +15,21 @@ */ package org.thingsboard.server.transport.lwm2m.server.downlink; -import lombok.Setter; import org.eclipse.leshan.core.request.WriteRequest; import org.eclipse.leshan.core.response.WriteResponse; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; -public class TbLwM2MWriteResponseCallback extends AbstractTbLwM2MRequestCallback { - - private final String targetId; +public class TbLwM2MWriteResponseCallback extends TbLwM2MTargetedCallback { public TbLwM2MWriteResponseCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client); - this.targetId = targetId; + super(handler, client, targetId); } @Override public void onSuccess(WriteRequest request, WriteResponse response) { - handler.onWriteResponseOk(client, targetId, request); - //TODO: separate callback wrapper for the RPC calls. + super.onSuccess(request, response); + handler.onWriteResponseOk(client, versionedId, request); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 15df144cc4..34163815d7 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -195,7 +195,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { try { log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId()); if (lwM2MClient != null) { - this.clientContext.register(lwM2MClient, registration); + Optional oldSessionInfo = this.clientContext.register(lwM2MClient, registration); + if (oldSessionInfo.isPresent()) { + log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB())); + closeSession(oldSessionInfo.get()); + } this.logToTelemetry(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); SessionInfoProto sessionInfo = lwM2MClient.getSession(); transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, rpcHandler, sessionInfo)); @@ -272,8 +276,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { clientContext.unregister(client, registration); SessionInfoProto sessionInfo = client.getSession(); if (sessionInfo != null) { - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); - transportService.deregisterSession(sessionInfo); + closeSession(sessionInfo); sessionStore.remove(registration.getEndpoint()); log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType()); } else { @@ -288,6 +291,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { }); } + public void closeSession(SessionInfoProto sessionInfo) { + transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.deregisterSession(sessionInfo); + } + @Override public void onSleepingDev(Registration registration) { log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint()); @@ -441,8 +449,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { */ @Override public void doDisconnect(SessionInfoProto sessionInfo) { - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); - transportService.deregisterSession(sessionInfo); + closeSession(sessionInfo); } /** @@ -586,7 +593,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { * #3 If fr_update -> UpdateFirmware * #4 updateAttrTelemetry * - * @param lwM2MClient - Registration LwM2M Client + * @param lwM2MClient - Registration LwM2M Client * @param lwM2mResource - LwM2mSingleResource response.getContent() * @param path - resource */ @@ -784,13 +791,6 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { return lwm2mResourceValue; } - /** - * Update resource (attribute) value on thingsboard after update value in client - * - * @param registration - - * @param path - - * @param request - - */ @Override public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) { if (request.getNode() instanceof LwM2mResource) { @@ -955,7 +955,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { * Get path to resource from profile equal keyName * * @param sessionInfo - - * @param keyName - + * @param keyName - * @return - */ @Override