diff --git a/application/src/main/java/org/eclipse/leshan/server/observation/ObservationServiceImpl.java b/application/src/main/java/org/eclipse/leshan/server/observation/ObservationServiceImpl.java deleted file mode 100644 index 89a2f07179..0000000000 --- a/application/src/main/java/org/eclipse/leshan/server/observation/ObservationServiceImpl.java +++ /dev/null @@ -1,296 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Copyright (c) 2016 Sierra Wireless and others. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v2.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v20.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.html. - * - * Contributors: - * Sierra Wireless - initial API and implementation - * Michał Wadowski (Orange) - Add Observe-Composite feature. - */ -package org.eclipse.leshan.server.observation; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.leshan.core.node.LwM2mPath; -import org.eclipse.leshan.core.observation.CompositeObservation; -import org.eclipse.leshan.core.observation.Observation; -import org.eclipse.leshan.core.observation.SingleObservation; -import org.eclipse.leshan.core.peer.LwM2mPeer; -import org.eclipse.leshan.core.response.ObserveCompositeResponse; -import org.eclipse.leshan.core.response.ObserveResponse; -import org.eclipse.leshan.server.endpoint.LwM2mServerEndpoint; -import org.eclipse.leshan.server.endpoint.LwM2mServerEndpointsProvider; -import org.eclipse.leshan.server.profile.ClientProfile; -import org.eclipse.leshan.server.registration.Registration; -import org.eclipse.leshan.server.registration.RegistrationStore; -import org.eclipse.leshan.server.registration.RegistrationUpdate; -import org.eclipse.leshan.server.registration.UpdatedRegistration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * Implementation of the {@link ObservationService} accessing the persisted observation via the provided - * {@link RegistrationStore}. - * - * When a new observation is added or changed or canceled, the registered listeners are notified. - */ -@Slf4j -public class ObservationServiceImpl implements ObservationService, LwM2mNotificationReceiver { - - private final Logger LOG = LoggerFactory.getLogger(ObservationServiceImpl.class); - - private final RegistrationStore registrationStore; - private final LwM2mServerEndpointsProvider endpointProvider; - private final boolean updateRegistrationOnNotification; - - private final List listeners = new CopyOnWriteArrayList<>();; - - /** - * Creates an instance of {@link ObservationServiceImpl} - */ - public ObservationServiceImpl(RegistrationStore store, LwM2mServerEndpointsProvider endpointProvider) { - this(store, endpointProvider, false); - } - - /** - * Creates an instance of {@link ObservationServiceImpl} - * - * @param updateRegistrationOnNotification will activate registration update on observe notification. - * - * @since 1.1 - */ - public ObservationServiceImpl(RegistrationStore store, LwM2mServerEndpointsProvider endpointProvider, - boolean updateRegistrationOnNotification) { - this.registrationStore = store; - this.updateRegistrationOnNotification = updateRegistrationOnNotification; - this.endpointProvider = endpointProvider; - } - - @Override - public int cancelObservations(Registration registration) { - // check registration id - String registrationId = registration.getId(); - if (registrationId == null) - return 0; - - Collection observations = registrationStore.removeObservations(registrationId); - if (observations == null) - return 0; - - for (Observation observation : observations) { - cancel(observation); - } - - return observations.size(); - } - - @Override - public int cancelObservations(Registration registration, String nodePath) { - if (registration == null || registration.getId() == null || nodePath == null || nodePath.isEmpty()) - return 0; - - Set observations = getObservationsForCancel(registration.getId(), nodePath); - for (Observation observation : observations) { - cancelObservation(observation); - } - return observations.size(); - } - - @Override - public int cancelCompositeObservations(Registration registration, String[] nodePaths) { - if (registration == null || registration.getId() == null || nodePaths == null || nodePaths.length == 0) - return 0; - - Set observations = getCompositeObservationsForCancel(registration.getId(), nodePaths); - for (Observation observation : observations) { - cancelObservation(observation); - } - return observations.size(); - } - - @Override - public void cancelObservation(Observation observation) { - if (observation == null) - return; - - registrationStore.removeObservation(observation.getRegistrationId(), observation.getId()); - cancel(observation); - } - - private void cancel(Observation observation) { - List endpoints = endpointProvider.getEndpoints(); - for (LwM2mServerEndpoint lwM2mEndpoint : endpoints) { - lwM2mEndpoint.cancelObservation(observation); - } - - for (ObservationListener listener : listeners) { - listener.cancelled(observation); - } - } - - @Override - public Set getObservations(Registration registration) { - return getObservations(registration.getId()); - } - - private Set getObservations(String registrationId) { - if (registrationId == null) - return Collections.emptySet(); - - return new HashSet<>(registrationStore.getObservations(registrationId)); - } - - private Set getCompositeObservationsForCancel(String registrationId, String[] nodePaths) { - if (registrationId == null || nodePaths == null) - return Collections.emptySet(); - - // array of String to array of LWM2M path - List lwPaths = new ArrayList<>(nodePaths.length); - for (int i = 0; i < nodePaths.length; i++) { - lwPaths.add(new LwM2mPath(nodePaths[i])); - } - - // search composite-observation - Set result = new HashSet<>(); - for (Observation obs : getObservations(registrationId)) { - if (obs instanceof CompositeObservation) { - if (lwPaths.equals(((CompositeObservation) obs).getPaths())) { - result.add(obs); - } - } - } - return result; - } - - private Set getObservationsForCancel(String registrationId, String nodePath) { - if (registrationId == null || nodePath == null) - return Collections.emptySet(); - - Set result = new HashSet<>(); - LwM2mPath lwPath = new LwM2mPath(nodePath); - for (Observation obs : getObservations(registrationId)) { - if (obs instanceof SingleObservation) { - LwM2mPath lwPathObs = ((SingleObservation) obs).getPath(); - if (lwPath.equals(lwPathObs) || lwPathObs.startWith(lwPath)) { // nodePath = "3", lwPathObs = "3/0/9": cancel for tne all lwPathObs - result.add(obs); - } else if (!lwPath.equals(lwPathObs) && lwPath.startWith(lwPathObs)) { // nodePath = "3/0/9", lwPathObs = "3": error... - String errorMsg = String.format( - "Unexpected error : There is registration with id [%s] existing observation [%s] includes input observation [%s]!", - registrationId, lwPathObs, lwPath); - throw new IllegalStateException(errorMsg); - } - } - } - - return result; - } - - @Override - public void addListener(ObservationListener listener) { - listeners.add(listener); - } - - @Override - public void removeListener(ObservationListener listener) { - listeners.remove(listener); - } - - private Registration updateRegistrationOnRegistration(Observation observation, LwM2mPeer sender, - ClientProfile profile) { - if (updateRegistrationOnNotification) { - RegistrationUpdate regUpdate = new RegistrationUpdate(observation.getRegistrationId(), sender, null, null, - null, null, null, null, null, null, null, null); - UpdatedRegistration updatedRegistration = registrationStore.updateRegistration(regUpdate); - if (updatedRegistration == null || updatedRegistration.getUpdatedRegistration() == null) { - String errorMsg = String.format( - "Unexpected error: There is no registration with id %s for this observation %s", - observation.getRegistrationId(), observation); - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - return updatedRegistration.getUpdatedRegistration(); - } - return profile.getRegistration(); - } - - // ********** NotificationListener interface **********// - @Override - public void onNotification(SingleObservation observation, LwM2mPeer sender, ClientProfile profile, - ObserveResponse response) { - try { - Registration updatedRegistration = updateRegistrationOnRegistration(observation, sender, profile); - for (ObservationListener listener : listeners) { - listener.onResponse(observation, updatedRegistration, response); - } - } catch (Exception e) { - for (ObservationListener listener : listeners) { - listener.onError(observation, profile.getRegistration(), e); - } - } - } - - @Override - public void onNotification(CompositeObservation observation, LwM2mPeer sender, ClientProfile profile, - ObserveCompositeResponse response) { - try { - Registration updatedRegistration = updateRegistrationOnRegistration(observation, sender, profile); - for (ObservationListener listener : listeners) { - listener.onResponse(observation, updatedRegistration, response); - } - } catch (Exception e) { - for (ObservationListener listener : listeners) { - listener.onError(observation, profile.getRegistration(), e); - } - } - } - - @Override - public void onError(Observation observation, LwM2mPeer sender, ClientProfile profile, Exception error) { - for (ObservationListener listener : listeners) { - listener.onError(observation, profile.getRegistration(), error); - } - } - - @Override - public void newObservation(Observation observation, Registration registration) { - for (ObservationListener listener : listeners) { - listener.newObservation(observation, registration); - } - } - - @Override - public void cancelled(Observation observation) { - for (ObservationListener listener : listeners) { - listener.cancelled(observation); - } - - } -} diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java index 86cd8186c7..94ee1eb48e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2MIntegrationObserveCompositeTest.java @@ -355,8 +355,8 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt } /** - * ObserveComposite {"ids":["/3/0/9", "/3/0/14", "/5/0/3", "/19/1/0/0"]} - Ok - * ObserveCompositeCancel {"ids":["/3", "/19/1/0/0"]} - Ok + * ObserveComposite {"ids":["/3/0/9", "/5/0/5", "/5/0/3", "/5/0/7", "/19/1/0/0"]} - Ok + * ObserveCompositeCancel {"ids":["/5", "/19/1/0/0"]} - Ok * last Observation * @throws Exception */ @@ -380,7 +380,7 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt actualResult = sendCompositeRPCByIds("ObserveCompositeCancel", expectedIds); rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); - assertEquals("4", rpcActualResult.get("value").asText()); + assertEquals("4", rpcActualResult.get("value").asText()); // CNT = 4 ("/5/0/5", "/5/0/3", "/5/0/7", "/19/1/0/0"9) String actualResultReadAll = sendCompositeRPCByKeys("ObserveReadAll", null); ObjectNode rpcActualResultReadAll = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class); @@ -411,7 +411,7 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt actualResult = sendCompositeRPCByIds("ObserveCompositeCancel", expectedIds); rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); assertEquals(ResponseCode.BAD_REQUEST.getName(), rpcActualResult.get("result").asText()); - String expectedValue = "for observation path " + fromVersionedIdToObjectId(idVer_3_0_9) + ", that includes this observation path " + fromVersionedIdToObjectId(objectIdVer_3); + String expectedValue = "for observation path [" + fromVersionedIdToObjectId(objectIdVer_3) + "], that includes this observation path [" + fromVersionedIdToObjectId(idVer_3_0_9); assertTrue(rpcActualResult.get("error").asText().contains(expectedValue)); // ObserveCompositeCancel diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java index bf1ba41270..5a849e70e4 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/rpc/sql/RpcLwm2mIntegrationObserveTest.java @@ -333,9 +333,9 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO // cancel observe "/3_1.2/0/9" String expectedId_3_0_9 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_9; String actualResult = sendRpcObserve("ObserveCancel", expectedId_3_0_9); - String expectedValue = "existing observation [" + fromVersionedIdToObjectId(objectIdVer_3) + "] includes input observation [" + fromVersionedIdToObjectId(expectedId_3_0_9); + String expectedValue = "for observation path [" + fromVersionedIdToObjectId(objectIdVer_3) + "], that includes this observation path [" + fromVersionedIdToObjectId(expectedId_3_0_9); rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); - assertEquals(ResponseCode.INTERNAL_SERVER_ERROR.getName(), rpcActualResult.get("result").asText()); + assertEquals(ResponseCode.BAD_REQUEST.getName(), rpcActualResult.get("result").asText()); assertTrue(rpcActualResult.get("error").asText().contains(expectedValue)); // cancel observe "/3_1.2" @@ -346,29 +346,33 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO } /** + * Observe {"id":"/3/0/0"} * Observe {"id":"/3/0/9"} - * ObserveCancel {"id":"/3"} + * ObserveCancel {"id":"/3"} - Ok, cnt = 2 */ @Test public void testObserveResource_ObserveCancelObject_Result_CONTENT_Count_1() throws Exception { sendCancelObserveAllWithAwait(deviceId); - + String expectedId_3_0_0 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_0; + sendRpcObserve("Observe", expectedId_3_0_0); String expectedId_3_0_9 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_9; sendRpcObserve("Observe", expectedId_3_0_9); String actualResultReadAll = sendRpcObserve("ObserveReadAll", null); ObjectNode rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); String actualValuesReadAll = rpcActualResult.get("value").asText(); - assertEquals(1, actualValuesReadAll.split(",").length); - String expected = "\"SingleObservation:" + fromVersionedIdToObjectId(expectedId_3_0_9) + "\""; - assertTrue(actualValuesReadAll.contains(expected)); + assertEquals(2, actualValuesReadAll.split(",").length); + String expected_3_0_0 = "\"SingleObservation:" + fromVersionedIdToObjectId(expectedId_3_0_0) + "\""; + String expected_3_0_9 = "\"SingleObservation:" + fromVersionedIdToObjectId(expectedId_3_0_9) + "\""; + assertTrue(actualValuesReadAll.contains(expected_3_0_0)); + assertTrue(actualValuesReadAll.contains(expected_3_0_9)); // cancel observe "/3_1.2" String expectedId_3 = objectIdVer_3; String actualResult = sendRpcObserve("ObserveCancel", expectedId_3); rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class); assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText()); - assertEquals("1", rpcActualResult.get("value").asText()); + assertEquals("2", rpcActualResult.get("value").asText()); } private String sendRpcObserve(String method, String params) throws Exception { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java index d8cd2b44ba..af127dba88 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java @@ -259,7 +259,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im for (Observation obs : observations) { LwM2mPath lwPathObs = ((SingleObservation) obs).getPath(); for (LwM2mPath nodePath : listPath) { - String validNodePath = validatePathCompositeCancel(nodePath, lwPathObs, client); + String validNodePath = validatePathObserveCancelAny(nodePath, lwPathObs, client); if (validNodePath != null) lwPaths.add(validNodePath); } }; @@ -273,13 +273,13 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im } } - private String validatePathCompositeCancel(LwM2mPath nodePath, LwM2mPath lwPathObs, LwM2mClient client) throws ThingsboardException { + private String validatePathObserveCancelAny(LwM2mPath nodePath, LwM2mPath lwPathObs, LwM2mClient client) throws ThingsboardException { if (nodePath.equals(lwPathObs) || lwPathObs.startWith(nodePath)) { // nodePath = "3", lwPathObs = "3/0/9": cancel for tne all lwPathObs - return nodePath.toString(); + return lwPathObs.toString(); } else if (!nodePath.equals(lwPathObs) && nodePath.startWith(lwPathObs)) { String errorMsg = String.format( - "Unexpected error: There is registration with Endpoint %s for observation path %s, that includes this observation path %s", - client.getRegistration().getEndpoint(), nodePath, lwPathObs); + "Unexpected error: There is registration with Endpoint %s for observation path [%s], that includes this observation path [%s]", + client.getRegistration().getEndpoint(), lwPathObs, nodePath); throw new ThingsboardException(errorMsg, ThingsboardErrorCode.BAD_REQUEST_PARAMS); } return null; @@ -335,9 +335,24 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @Override public void sendCancelObserveRequest(LwM2mClient client, TbLwM2MCancelObserveRequest request, DownlinkRequestCallback callback) { - validateVersionedId(client, request); - int observeCancelCnt = context.getServer().getObservationService().cancelObservations(client.getRegistration(), request.getObjectId()); - callback.onSuccess(request, observeCancelCnt); + try{ + validateVersionedId(client, request); + Set observations = context.getServer().getObservationService().getObservations(client.getRegistration()); + int observeCancelCnt = 0; + Set lwPaths = new HashSet<>(); + for (Observation obs : observations) { + LwM2mPath lwPathObs = ((SingleObservation) obs).getPath(); + LwM2mPath nodePath = new LwM2mPath(request.getObjectId()); + String validNodePath = validatePathObserveCancelAny(nodePath, lwPathObs, client); + if (validNodePath != null) lwPaths.add(validNodePath); + }; + for (String nodePath : lwPaths) { + observeCancelCnt += context.getServer().getObservationService().cancelObservations(client.getRegistration(), nodePath); + } + callback.onSuccess(request, observeCancelCnt); + } catch (ThingsboardException e){ + callback.onValidationError(request.toString(), e.getMessage()); + } } @Override