coap: final version

This commit is contained in:
nick 2024-01-30 16:37:14 +02:00
parent cdc6daa7a9
commit 41adbe1003
14 changed files with 127 additions and 51 deletions

View File

@ -25,6 +25,7 @@ import org.awaitility.Awaitility;
import org.eclipse.californium.core.CoapObserveRelation; import org.eclipse.californium.core.CoapObserveRelation;
import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DynamicProtoUtils; import org.thingsboard.server.common.data.DynamicProtoUtils;
@ -238,7 +239,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)"; String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)";
await(awaitAlias) await(awaitAlias)
.atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && .until(() -> ResponseCode.VALID.equals(callbackCoap.getResponseCode()) &&
callbackCoap.getObserve() != null && callbackCoap.getObserve() != null &&
0 == callbackCoap.getObserve().intValue()); 0 == callbackCoap.getObserve().intValue());
if (emptyCurrentStateNotification) { if (emptyCurrentStateNotification) {
@ -285,7 +286,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap);
await(awaitAlias) await(awaitAlias)
.atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && .until(() -> ResponseCode.VALID.equals(callbackCoap.getResponseCode()) &&
callbackCoap.getObserve() != null && callbackCoap.getObserve() != null &&
0 == callbackCoap.getObserve().intValue()); 0 == callbackCoap.getObserve().intValue());

View File

@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.server.resources.Resource; import org.eclipse.californium.core.server.resources.Resource;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.coapserver.DefaultCoapServerService; import org.thingsboard.server.coapserver.DefaultCoapServerService;
@ -59,11 +60,15 @@ public class CoapAttributesUpdatesIntegrationTest extends AbstractCoapAttributes
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processJsonTestSubscribeToAttributesUpdates(false); processJsonTestSubscribeToAttributesUpdates(false);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception {
processJsonTestSubscribeToAttributesUpdates(true); processJsonTestSubscribeToAttributesUpdates(true);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.attributes.updates;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.CoapDeviceType;
import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.TransportPayloadType;
@ -44,11 +45,12 @@ public class CoapAttributesUpdatesJsonIntegrationTest extends AbstractCoapAttrib
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processJsonTestSubscribeToAttributesUpdates(false); processJsonTestSubscribeToAttributesUpdates(false);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception {
processJsonTestSubscribeToAttributesUpdates(true); processJsonTestSubscribeToAttributesUpdates(true);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.attributes.updates;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.CoapDeviceType;
import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.TransportPayloadType;
@ -43,12 +44,12 @@ public class CoapAttributesUpdatesProtoIntegrationTest extends AbstractCoapAttri
public void afterTest() throws Exception { public void afterTest() throws Exception {
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processProtoTestSubscribeToAttributesUpdates(false); processProtoTestSubscribeToAttributesUpdates(false);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception {
processProtoTestSubscribeToAttributesUpdates(true); processProtoTestSubscribeToAttributesUpdates(true);

View File

@ -26,6 +26,7 @@ import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
@ -82,6 +83,7 @@ public class CoapClientIntegrationTest extends AbstractCoapIntegrationTest {
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testConfirmableRequests() throws Exception { public void testConfirmableRequests() throws Exception {
boolean confirmable = true; boolean confirmable = true;
@ -90,6 +92,7 @@ public class CoapClientIntegrationTest extends AbstractCoapIntegrationTest {
processTestRequestAttributesValuesFromTheServer(confirmable); processTestRequestAttributesValuesFromTheServer(confirmable);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testNonConfirmableRequests() throws Exception { public void testNonConfirmableRequests() throws Exception {
boolean confirmable = false; boolean confirmable = false;

View File

@ -82,15 +82,27 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
.until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) &&
callbackCoap.getObserve() != null && 0 == callbackCoap.getObserve()); callbackCoap.getObserve() != null && 0 == callbackCoap.getObserve());
validateCurrentStateNotification(callbackCoap); validateCurrentStateNotification(callbackCoap);
int expectedObserveAfterRpcProcessed = callbackCoap.getObserve() + 1;
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; int expectedObserveAfterRpcProcessed1 = callbackCoap.getObserve() + 1;
String setGpioRequest = "{\"method\":\"setGpio1\",\"params\":{\"pin\": \"21\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString(); String deviceId = savedDevice.getId().getId().toString();
String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
awaitAlias = "await One Way Rpc setGpio(method, params, value)"; awaitAlias = "await One Way Rpc setGpio(method, params, value)";
await(awaitAlias) await(awaitAlias)
.atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) &&
callbackCoap.getObserve() != null && expectedObserveAfterRpcProcessed == callbackCoap.getObserve()); callbackCoap.getObserve() != null && expectedObserveAfterRpcProcessed1 == callbackCoap.getObserve());
validateOneWayStateChangedNotification(callbackCoap, result);
int expectedObserveAfterRpcProcessed2 = callbackCoap.getObserve() + 1;
setGpioRequest = "{\"method\":\"setGpio2\",\"params\":{\"pin\": \"22\",\"value\": 2}}";
deviceId = savedDevice.getId().getId().toString();
result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
awaitAlias = "await One Way Rpc setGpio(method, params, value)";
await(awaitAlias)
.atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) &&
callbackCoap.getObserve() != null && expectedObserveAfterRpcProcessed2 == callbackCoap.getObserve());
validateOneWayStateChangedNotification(callbackCoap, result); validateOneWayStateChangedNotification(callbackCoap, result);
observeRelation.proactiveCancel(); observeRelation.proactiveCancel();

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.service.security.AccessValidator;
@ -82,11 +83,13 @@ public class CoapServerSideRpcDefaultIntegrationTest extends AbstractCoapServerS
Assert.assertEquals(AccessValidator.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result); Assert.assertEquals(AccessValidator.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapOneWayRpc() throws Exception { public void testServerCoapOneWayRpc() throws Exception {
processOneWayRpcTest(false); processOneWayRpcTest(false);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapTwoWayRpc() throws Exception { public void testServerCoapTwoWayRpc() throws Exception {
processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}", false); processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}", false);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.rpc;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.CoapDeviceType;
import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.TransportPayloadType;
@ -43,11 +44,13 @@ public class CoapServerSideRpcJsonIntegrationTest extends AbstractCoapServerSide
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapOneWayRpc() throws Exception { public void testServerCoapOneWayRpc() throws Exception {
processOneWayRpcTest(false); processOneWayRpcTest(false);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapTwoWayRpc() throws Exception { public void testServerCoapTwoWayRpc() throws Exception {
processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}", false); processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}", false);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.rpc;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.CoapDeviceType;
import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.TransportPayloadType;
@ -44,11 +45,13 @@ public class CoapServerSideRpcProtoIntegrationTest extends AbstractCoapServerSid
processAfterTest(); processAfterTest();
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapOneWayRpc() throws Exception { public void testServerCoapOneWayRpc() throws Exception {
processOneWayRpcTest(true); processOneWayRpcTest(true);
} }
@Ignore // Uncomment when Californium 3.11 is released with https://github.com/eclipse-californium/californium/pull/2215
@Test @Test
public void testServerCoapTwoWayRpc() throws Exception { public void testServerCoapTwoWayRpc() throws Exception {
processTwoWayRpcTest("{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}", true); processTwoWayRpcTest("{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}", true);

View File

@ -19,7 +19,6 @@ import com.google.gson.JsonParseException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.Exchange; import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.CoapExchange;
@ -27,6 +26,8 @@ import org.eclipse.californium.core.server.resources.Resource;
import org.eclipse.californium.core.server.resources.ResourceObserver; import org.eclipse.californium.core.server.resources.ResourceObserver;
import org.thingsboard.server.coapserver.CoapServerService; import org.thingsboard.server.coapserver.CoapServerService;
import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo; import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.DeviceTransportType;
@ -35,13 +36,11 @@ import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback; import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
import org.thingsboard.server.transport.coap.callback.CoapOkCallback; import org.thingsboard.server.transport.coap.callback.CoapResponseCodeCallback;
import org.thingsboard.server.transport.coap.callback.GetAttributesSyncSessionCallback; import org.thingsboard.server.transport.coap.callback.GetAttributesSyncSessionCallback;
import org.thingsboard.server.transport.coap.callback.ToServerRpcSyncSessionCallback; import org.thingsboard.server.transport.coap.callback.ToServerRpcSyncSessionCallback;
import org.thingsboard.server.transport.coap.client.CoapClientContext; import org.thingsboard.server.transport.coap.client.CoapClientContext;
@ -54,7 +53,6 @@ import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.eclipse.californium.elements.DtlsEndpointContext.KEY_SESSION_ID; import static org.eclipse.californium.elements.DtlsEndpointContext.KEY_SESSION_ID;
@ -84,30 +82,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
} }
/*
* Overwritten method from CoapResource to be able to manage our own observe notification counters.
*/
@Override
public void checkObserveRelation(Exchange exchange, Response response) {
String token = getTokenFromRequest(exchange.getRequest());
final ObserveRelation relation = exchange.getRelation();
if (relation == null || relation.isCanceled()) {
return; // because request did not try to establish a relation
}
if (response.getCode().isSuccess()) {
if (!relation.isEstablished()) {
relation.setEstablished();
addObserveRelation(relation);
}
AtomicInteger state = clients.getNotificationCounterByToken(token);
if (state != null) {
response.getOptions().setObserve(state.getAndIncrement());
} else {
response.getOptions().removeObserve();
}
} // ObserveLayer takes care of the else case
}
@Override @Override
protected void processHandleGet(CoapExchange exchange) { protected void processHandleGet(CoapExchange exchange) {
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest()); Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
@ -278,7 +252,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
UUID sessionId = toSessionId(sessionInfo); UUID sessionId = toSessionId(sessionInfo);
transportService.process(sessionInfo, clientState.getAdaptor().convertToPostAttributes(sessionId, request, transportService.process(sessionInfo, clientState.getAdaptor().convertToPostAttributes(sessionId, request,
clientState.getConfiguration().getAttributesMsgDescriptor()), clientState.getConfiguration().getAttributesMsgDescriptor()),
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
} }
private void handlePostTelemetryRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException { private void handlePostTelemetryRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
@ -286,7 +260,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
UUID sessionId = toSessionId(sessionInfo); UUID sessionId = toSessionId(sessionInfo);
transportService.process(sessionInfo, clientState.getAdaptor().convertToPostTelemetry(sessionId, request, transportService.process(sessionInfo, clientState.getAdaptor().convertToPostTelemetry(sessionId, request,
clientState.getConfiguration().getTelemetryMsgDescriptor()), clientState.getConfiguration().getTelemetryMsgDescriptor()),
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
} }
private void handleClaimRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException { private void handleClaimRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
@ -294,7 +268,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
UUID sessionId = toSessionId(sessionInfo); UUID sessionId = toSessionId(sessionInfo);
transportService.process(sessionInfo, transportService.process(sessionInfo,
clientState.getAdaptor().convertToClaimDevice(sessionId, request, sessionInfo), clientState.getAdaptor().convertToClaimDevice(sessionId, request, sessionInfo),
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
} }
private void handleAttributeSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) { private void handleAttributeSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {
@ -320,7 +294,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
UUID sessionId = toSessionId(session); UUID sessionId = toSessionId(session);
transportService.process(session, transportService.process(session,
clientState.getAdaptor().convertToDeviceRpcResponse(sessionId, request, clientState.getConfiguration().getRpcResponseMsgDescriptor()), clientState.getAdaptor().convertToDeviceRpcResponse(sessionId, request, clientState.getConfiguration().getRpcResponseMsgDescriptor()),
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
} }
private void handleRpcSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) { private void handleRpcSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.coap.callback;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.thingsboard.server.common.transport.TransportServiceCallback;
public class CoapResponseCallback implements TransportServiceCallback<Void> {
protected final CoapExchange exchange;
protected final Response onSuccessResponse;
protected final Response onFailureResponse;
public CoapResponseCallback(CoapExchange exchange, Response onSuccessResponse, Response onFailureResponse) {
this.exchange = exchange;
this.onSuccessResponse = onSuccessResponse;
this.onFailureResponse = onFailureResponse;
}
/**
* @param msg
*/
@Override
public void onSuccess(Void msg) {
this.onSuccessResponse.setConfirmable(isConRequest());
exchange.respond(this.onSuccessResponse);
}
/**
* @param e
*/
@Override
public void onError(Throwable e) {
exchange.respond(onFailureResponse);
}
protected boolean isConRequest() {
return exchange.advanced().getRequest().isConfirmable();
}
}

View File

@ -20,13 +20,13 @@ import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.CoapExchange;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
public class CoapOkCallback implements TransportServiceCallback<Void> { public class CoapResponseCodeCallback implements TransportServiceCallback<Void> {
protected final CoapExchange exchange; protected final CoapExchange exchange;
protected final CoAP.ResponseCode onSuccessResponse; protected final CoAP.ResponseCode onSuccessResponse;
protected final CoAP.ResponseCode onFailureResponse; protected final CoAP.ResponseCode onFailureResponse;
public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) { public CoapResponseCodeCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) {
this.exchange = exchange; this.exchange = exchange;
this.onSuccessResponse = onSuccessResponse; this.onSuccessResponse = onSuccessResponse;
this.onFailureResponse = onFailureResponse; this.onFailureResponse = onFailureResponse;

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.client;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.CoapExchange;
@ -26,6 +27,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.coapserver.CoapServerContext; import org.thingsboard.server.coapserver.CoapServerContext;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
@ -45,7 +47,6 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.transport.coap.CoapSessionMsgType;
import org.thingsboard.server.common.transport.DeviceDeletedEvent; import org.thingsboard.server.common.transport.DeviceDeletedEvent;
import org.thingsboard.server.common.transport.DeviceProfileUpdatedEvent; import org.thingsboard.server.common.transport.DeviceProfileUpdatedEvent;
import org.thingsboard.server.common.transport.DeviceUpdatedEvent; import org.thingsboard.server.common.transport.DeviceUpdatedEvent;
@ -53,18 +54,19 @@ import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.transport.coap.CoapSessionMsgType;
import org.thingsboard.server.transport.coap.CoapTransportContext; import org.thingsboard.server.transport.coap.CoapTransportContext;
import org.thingsboard.server.transport.coap.TbCoapMessageObserver; import org.thingsboard.server.transport.coap.TbCoapMessageObserver;
import org.thingsboard.server.transport.coap.TransportConfigurationContainer; import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.callback.AbstractSyncSessionCallback; import org.thingsboard.server.transport.coap.callback.AbstractSyncSessionCallback;
import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback; import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
import org.thingsboard.server.transport.coap.callback.CoapOkCallback; import org.thingsboard.server.transport.coap.callback.CoapResponseCallback;
import org.thingsboard.server.transport.coap.callback.CoapResponseCodeCallback;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -329,9 +331,14 @@ public class DefaultCoapClientContext implements CoapClientContext {
TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(), TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(),
new CoapNoOpCallback(exchange)); new CoapNoOpCallback(exchange));
} else { } else {
Response response = new Response(CoAP.ResponseCode.VALID);
if (state.getRpc() == null) {
state.setRpc(new TbCoapObservationState(exchange, token));
}
response.getOptions().setObserve(state.getRpc().getObserveCounter().getAndIncrement());
transportService.process(state.getSession(), transportService.process(state.getSession(),
TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) new CoapResponseCallback(exchange, response, new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR))
); );
} }
} }
@ -478,7 +485,13 @@ public class DefaultCoapClientContext implements CoapClientContext {
TbCoapObservationState attrs = state.getAttrs(); TbCoapObservationState attrs = state.getAttrs();
if (attrs != null) { if (attrs != null) {
try { try {
Response response = state.getAdaptor().convertToPublish(msg); Response resp = state.getAdaptor().convertToPublish(msg);
Response response = new Response(ResponseCode.VALID);
response.setPayload(resp.getPayload());
if (state.getRpc() == null) {
state.setRpc(new TbCoapObservationState(attrs.getExchange(), attrs.getToken()));
}
response.getOptions().setObserve(state.getRpc().getObserveCounter().getAndIncrement());
respond(attrs.getExchange(), response, state.getContentFormat()); respond(attrs.getExchange(), response, state.getContentFormat());
} catch (AdaptorException e) { } catch (AdaptorException e) {
log.trace("Failed to reply due to error", e); log.trace("Failed to reply due to error", e);
@ -509,6 +522,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs()); boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
int requestId = getNextMsgId(); int requestId = getNextMsgId();
Response response = state.getAdaptor().convertToPublish(msg); Response response = state.getAdaptor().convertToPublish(msg);
response.getOptions().setObserve(state.getRpc().getObserveCounter().getAndIncrement());
response.setConfirmable(conRequest); response.setConfirmable(conRequest);
response.setMID(requestId); response.setMID(requestId);
if (conRequest) { if (conRequest) {
@ -573,6 +587,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
int requestId = getNextMsgId(); int requestId = getNextMsgId();
try { try {
Response response = state.getAdaptor().convertToPublish(msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder()); Response response = state.getAdaptor().convertToPublish(msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
response.getOptions().setObserve(state.getRpc().getObserveCounter().getAndIncrement());
response.setConfirmable(conRequest); response.setConfirmable(conRequest);
response.setMID(requestId); response.setMID(requestId);
if (conRequest) { if (conRequest) {
@ -808,7 +823,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
state.setRpc(null); state.setRpc(null);
transportService.process(state.getSession(), transportService.process(state.getSession(),
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
if (state.getAttrs() == null) { if (state.getAttrs() == null) {
closeAndCleanup(state); closeAndCleanup(state);
} }
@ -822,7 +837,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
state.setAttrs(null); state.setAttrs(null);
transportService.process(state.getSession(), transportService.process(state.getSession(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); new CoapResponseCodeCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
if (state.getRpc() == null) { if (state.getRpc() == null) {
closeAndCleanup(state); closeAndCleanup(state);
} }

View File

@ -72,7 +72,7 @@
<fasterxml-classmate.version>1.3.4</fasterxml-classmate.version> <fasterxml-classmate.version>1.3.4</fasterxml-classmate.version>
<auth0-jwt.version>4.2.1</auth0-jwt.version> <auth0-jwt.version>4.2.1</auth0-jwt.version>
<json-schema-validator.version>2.2.6</json-schema-validator.version> <json-schema-validator.version>2.2.6</json-schema-validator.version>
<californium.version>3.9.1</californium.version> <californium.version>3.10.0</californium.version>
<leshan.version>2.0.0-M14</leshan.version> <leshan.version>2.0.0-M14</leshan.version>
<gson.version>2.9.0</gson.version> <gson.version>2.9.0</gson.version>
<freemarker.version>2.3.30</freemarker.version> <freemarker.version>2.3.30</freemarker.version>