From 51e7549b58428bef1540d80b715c687a66c4ec37 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 15 Jun 2021 16:22:23 +0300 Subject: [PATCH] Await execution for sendReadRequests, sendObserveRequests --- .../server/downlink/TbLwM2MLatchCallback.java | 45 +++++++++++++++++++ .../server/downlink/TbLwM2MReadCallback.java | 4 +- .../uplink/DefaultLwM2MUplinkMsgHandler.java | 41 ++++++++++++++--- 3 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MLatchCallback.java diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MLatchCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MLatchCallback.java new file mode 100644 index 0000000000..a864275487 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MLatchCallback.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.downlink; + +import lombok.RequiredArgsConstructor; + +import java.util.concurrent.CountDownLatch; + +@RequiredArgsConstructor +public class TbLwM2MLatchCallback implements DownlinkRequestCallback { + + private final CountDownLatch countDownLatch; + private final DownlinkRequestCallback callback; + + @Override + public void onSuccess(R request, T response) { + callback.onSuccess(request, response); + countDownLatch.countDown(); + } + + @Override + public void onValidationError(String params, String msg) { + callback.onValidationError(params, msg); + countDownLatch.countDown(); + } + + @Override + public void onError(String params, Exception e) { + callback.onError(params, e); + countDownLatch.countDown(); + } +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java index b6d0801f40..46315a6fcb 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java @@ -18,10 +18,8 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.request.ReadRequest; import org.eclipse.leshan.core.response.ReadResponse; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; - -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; @Slf4j public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 15df144cc4..b1ffe66600 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -28,7 +28,10 @@ import org.eclipse.leshan.core.node.LwM2mObjectInstance; import org.eclipse.leshan.core.node.LwM2mPath; import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.observation.Observation; +import org.eclipse.leshan.core.request.ObserveRequest; +import org.eclipse.leshan.core.request.ReadRequest; import org.eclipse.leshan.core.request.WriteRequest; +import org.eclipse.leshan.core.response.ObserveResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.server.registration.Registration; import org.springframework.context.annotation.Lazy; @@ -69,7 +72,9 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2mFwSwUpdate; import org.thingsboard.server.transport.lwm2m.server.client.ParametersAnalyzeResult; import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue; import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto; +import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MLatchCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MDiscoverCallback; @@ -99,6 +104,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -510,16 +516,29 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { Set targetIds = new HashSet<>(profile.getObserveAttr().getAttribute()); targetIds.addAll(profile.getObserveAttr().getTelemetry()); targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet()); - lwM2MClient.getPendingReadRequests().addAll(targetIds); - targetIds.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId)); + + CountDownLatch latch = new CountDownLatch(targetIds.size()); + targetIds.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId, + new TbLwM2MLatchCallback<>(latch, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)))); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("Failed to await Read requests!"); + } } private void sendObserveRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set supportedObjects) { Set targetIds = profile.getObserveAttr().getObserve(); targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet()); -// TODO: why do we need to put observe into pending read requests? -// lwM2MClient.getPendingReadRequests().addAll(targetIds); - targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId)); + + CountDownLatch latch = new CountDownLatch(targetIds.size()); + targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId, + new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, lwM2MClient, targetId)))); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("Failed to await Observe requests!"); + } } private void sendWriteAttributeRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set supportedObjects) { @@ -544,13 +563,21 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { } private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId) { + sendReadRequest(lwM2MClient, versionedId, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)); + } + + private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback callback) { TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - defaultLwM2MDownlinkMsgHandler.sendReadRequest(lwM2MClient, request, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)); + defaultLwM2MDownlinkMsgHandler.sendReadRequest(lwM2MClient, request, callback); } private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId) { + sendObserveRequest(lwM2MClient, versionedId, new TbLwM2MObserveCallback(this, lwM2MClient, versionedId)); + } + + private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback callback) { TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - defaultLwM2MDownlinkMsgHandler.sendObserveRequest(lwM2MClient, request, new TbLwM2MObserveCallback(this, lwM2MClient, versionedId)); + defaultLwM2MDownlinkMsgHandler.sendObserveRequest(lwM2MClient, request, callback); } private void sendWriteAttributesRequest(LwM2mClient lwM2MClient, String targetId, ObjectAttributes params) {