diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java index a4dba5bb74..382f5595a5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java index a12e99b15d..83a6d2ff11 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java @@ -82,143 +82,8 @@ public abstract class AbstractCoapTransportResource extends CoapResource { .setEvent(event).build(); } - @SneakyThrows - protected int respond(Response response, CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo) { - int msgId = ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1); - response.setMID(msgId); - response.addMessageObserver(new MessageObserver() { - @Override - public void onRetransmission() { - } - - @Override - public void onResponse(Response response) { - } - - @Override - public void onAcknowledgement() { - TransportProtos.ToDeviceRpcRequestMsg msg = transportContext.getRpcAwaitingAck().remove(msgId); - if (msg != null) { - transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY); - } - } - - @Override - public void onReject() { - } - - @Override - public void onTimeout() { - } - - @Override - public void onCancel() { - } - - @Override - public void onReadyToSend() { - } - - @Override - public void onConnecting() { - } - - @Override - public void onDtlsRetransmission(int flight) { - } - - @Override - public void onSent(boolean retransmission) { - } - - @Override - public void onSendError(Throwable error) { - } - - @Override - public void onContextEstablished(EndpointContext endpointContext) { - } - - @Override - public void onComplete() { - } - }); - - exchange.respond(response); - return msgId; + protected int getNextMsgId() { + return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1); } - public static class CoapDeviceAuthCallback implements TransportServiceCallback { - private final TransportContext transportContext; - private final CoapExchange exchange; - private final BiConsumer onSuccess; - - public CoapDeviceAuthCallback(TransportContext transportContext, CoapExchange exchange, BiConsumer onSuccess) { - this.transportContext = transportContext; - this.exchange = exchange; - this.onSuccess = onSuccess; - } - - @Override - public void onSuccess(ValidateDeviceCredentialsResponse msg) { - DeviceProfile deviceProfile = msg.getDeviceProfile(); - if (msg.hasDeviceInfo() && deviceProfile != null) { - TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()); - onSuccess.accept(sessionInfoProto, deviceProfile); - } else { - exchange.respond(CoAP.ResponseCode.UNAUTHORIZED); - } - } - - @Override - public void onError(Throwable e) { - log.warn("Failed to process request", e); - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); - } - } - - public static class CoapOkCallback implements TransportServiceCallback { - private final CoapExchange exchange; - private final CoAP.ResponseCode onSuccessResponse; - private final CoAP.ResponseCode onFailureResponse; - - public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) { - this.exchange = exchange; - this.onSuccessResponse = onSuccessResponse; - this.onFailureResponse = onFailureResponse; - } - - @Override - public void onSuccess(Void msg) { - Response response = new Response(onSuccessResponse); - response.setAcknowledged(isConRequest()); - exchange.respond(response); - } - - @Override - public void onError(Throwable e) { - exchange.respond(onFailureResponse); - } - - private boolean isConRequest() { - return exchange.advanced().getRequest().isConfirmable(); - } - } - - public static class CoapNoOpCallback implements TransportServiceCallback { - private final CoapExchange exchange; - - CoapNoOpCallback(CoapExchange exchange) { - this.exchange = exchange; - } - - @Override - public void onSuccess(Void msg) { - } - - @Override - public void onError(Throwable e) { - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); - } - } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 9401fc47ca..449d371a2f 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -19,6 +19,7 @@ import com.google.gson.JsonParseException; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; @@ -53,6 +54,9 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; +import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback; +import org.thingsboard.server.transport.coap.callback.CoapOkCallback; import java.util.List; import java.util.Map; @@ -81,9 +85,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private final Set rpcSubscriptions = ConcurrentHashMap.newKeySet(); private final Set attributeSubscriptions = ConcurrentHashMap.newKeySet(); - private ConcurrentMap dtlsSessionIdMap; - private long timeout; - private long sessionReportTimeout; + private final ConcurrentMap dtlsSessionIdMap; + private final long timeout; public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { super(ctx, name); @@ -91,7 +94,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.addObserver(new CoapResourceObserver()); this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap(); this.timeout = coapServerService.getTimeout(); - this.sessionReportTimeout = ctx.getSessionReportTimeout(); + long sessionReportTimeout = ctx.getSessionReportTimeout(); ctx.getScheduler().scheduleAtFixedRate(() -> { Set coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet(); Set observeSessions = coapObserveSessionInfos @@ -110,7 +113,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { return; // because request did not try to establish a relation } if (CoAP.ResponseCode.isSuccess(response.getCode())) { - if (!relation.isEstablished()) { relation.setEstablished(); addObserveRelation(relation); @@ -280,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); if (currentCoapObserveAttrSessionInfo == null) { attributeSubscriptions.add(sessionId); - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); + registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), + sessionInfo, getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange)); transportService.process(sessionInfo, @@ -305,11 +307,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); if (currentCoapObserveRpcSessionInfo == null) { rpcSubscriptions.add(sessionId); - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); + registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder() + , sessionInfo, getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), - new CoapNoOpCallback(exchange) + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) ); } break; @@ -359,14 +361,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { return tokenToCoapSessionInfoMap.remove(token); } - private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { + private void registerAsyncCoapSession(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, + DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo, String token) { tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo)); transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { - return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, + DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { + return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); } private String getTokenFromRequest(Request request) { @@ -448,22 +452,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } } + @RequiredArgsConstructor private class CoapSessionListener implements SessionMsgListener { - private final CoapTransportResource coapTransportResource; private final CoapExchange exchange; private final CoapTransportAdaptor coapTransportAdaptor; private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; private final TransportProtos.SessionInfoProto sessionInfo; - CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { - this.coapTransportResource = coapTransportResource; - this.exchange = exchange; - this.coapTransportAdaptor = coapTransportAdaptor; - this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; - this.sessionInfo = sessionInfo; - } - @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { try { @@ -497,7 +493,9 @@ public class CoapTransportResource extends AbstractCoapTransportResource { public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { log.trace("[{}] Received RPC command to device", sessionId); try { - int requestId = respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder), exchange, sessionInfo); + Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); + int requestId = getNextMsgId(); + response.setMID(requestId); if (msg.getPersisted()) { transportContext.getRpcAwaitingAck().put(requestId, msg); transportContext.getScheduler().schedule(() -> { @@ -507,6 +505,13 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); } + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); + if (rpcRequestMsg != null) { + transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + } + })); + exchange.respond(response); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); @@ -529,8 +534,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) { - Map sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap(); - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { + Map sessionToObserveRelationMap = CoapTransportResource.this.getCoapSessionInfoToObserveRelationMap(); + if (CoapTransportResource.this.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { Optional observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> { TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto(); UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB()); @@ -539,16 +544,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { if (observeSessionToClose.isPresent()) { CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get(); ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo); - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode); + CoapTransportResource.this.clearAndNotifyObserveRelation(observeRelation, responseCode); } } } private void closeAndDeregister() { Request request = exchange.advanced().getRequest(); - String token = coapTransportResource.getTokenFromRequest(request); - CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token); - coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto()); + String token = CoapTransportResource.this.getTokenFromRequest(request); + CoapObserveSessionInfo deleted = CoapTransportResource.this.lookupAsyncSessionInfo(token); + CoapTransportResource.this.closeAndDeregister(deleted.getSessionInfoProto()); } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/OtaPackageTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/OtaPackageTransportResource.java index 2aadea26b2..74777d59d4 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/OtaPackageTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/OtaPackageTransportResource.java @@ -20,22 +20,19 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.network.Exchange; -import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.Resource; -import org.eclipse.californium.core.server.resources.ResourceObserver; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutorService; @Slf4j public class OtaPackageTransportResource extends AbstractCoapTransportResource { diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java new file mode 100644 index 0000000000..c0ed1422c7 --- /dev/null +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java @@ -0,0 +1,95 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap; + +import lombok.RequiredArgsConstructor; +import org.eclipse.californium.core.coap.MessageObserver; +import org.eclipse.californium.core.coap.Response; +import org.eclipse.californium.elements.EndpointContext; + +import java.util.function.Consumer; + +@RequiredArgsConstructor +public class TbCoapMessageObserver implements MessageObserver { + + private final int msgId; + private final Consumer onAcknowledge; + + @Override + public void onRetransmission() { + + } + + @Override + public void onResponse(Response response) { + + } + + @Override + public void onAcknowledgement() { + onAcknowledge.accept(msgId); + } + + @Override + public void onReject() { + + } + + @Override + public void onTimeout() { + + } + + @Override + public void onCancel() { + + } + + @Override + public void onReadyToSend() { + + } + + @Override + public void onConnecting() { + + } + + @Override + public void onDtlsRetransmission(int flight) { + + } + + @Override + public void onSent(boolean retransmission) { + + } + + @Override + public void onSendError(Throwable error) { + + } + + @Override + public void onContextEstablished(EndpointContext endpointContext) { + + } + + @Override + public void onComplete() { + + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapDeviceAuthCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapDeviceAuthCallback.java new file mode 100644 index 0000000000..fab0c9b615 --- /dev/null +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapDeviceAuthCallback.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap.callback; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.transport.TransportContext; +import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.auth.SessionInfoCreator; +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.transport.coap.AbstractCoapTransportResource; + +import java.util.UUID; +import java.util.function.BiConsumer; + +@Slf4j +public class CoapDeviceAuthCallback implements TransportServiceCallback { + private final TransportContext transportContext; + private final CoapExchange exchange; + private final BiConsumer onSuccess; + + public CoapDeviceAuthCallback(TransportContext transportContext, CoapExchange exchange, BiConsumer onSuccess) { + this.transportContext = transportContext; + this.exchange = exchange; + this.onSuccess = onSuccess; + } + + @Override + public void onSuccess(ValidateDeviceCredentialsResponse msg) { + DeviceProfile deviceProfile = msg.getDeviceProfile(); + if (msg.hasDeviceInfo() && deviceProfile != null) { + TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()); + onSuccess.accept(sessionInfoProto, deviceProfile); + } else { + exchange.respond(CoAP.ResponseCode.UNAUTHORIZED); + } + } + + @Override + public void onError(Throwable e) { + log.warn("Failed to process request", e); + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapNoOpCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapNoOpCallback.java new file mode 100644 index 0000000000..4fb00496c3 --- /dev/null +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapNoOpCallback.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap.callback; + +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.thingsboard.server.common.transport.TransportServiceCallback; + +public class CoapNoOpCallback implements TransportServiceCallback { + private final CoapExchange exchange; + + public CoapNoOpCallback(CoapExchange exchange) { + this.exchange = exchange; + } + + @Override + public void onSuccess(Void msg) { + } + + @Override + public void onError(Throwable e) { + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java new file mode 100644 index 0000000000..40e1b894ee --- /dev/null +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap.callback; + +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.Response; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.thingsboard.server.common.transport.TransportServiceCallback; + +public class CoapOkCallback implements TransportServiceCallback { + + protected final CoapExchange exchange; + protected final CoAP.ResponseCode onSuccessResponse; + protected final CoAP.ResponseCode onFailureResponse; + + public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) { + this.exchange = exchange; + this.onSuccessResponse = onSuccessResponse; + this.onFailureResponse = onFailureResponse; + } + + @Override + public void onSuccess(Void msg) { + Response response = new Response(onSuccessResponse); + response.setConfirmable(isConRequest()); + exchange.respond(response); + } + + @Override + public void onError(Throwable e) { + exchange.respond(onFailureResponse); + } + + protected boolean isConRequest() { + return exchange.advanced().getRequest().isConfirmable(); + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java index d53cd7e49e..fe86d8794c 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java @@ -35,6 +35,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.coap.MeasurementTypeProtos; import org.thingsboard.server.gen.transport.coap.MeasurementsProtos; import org.thingsboard.server.transport.coap.AbstractCoapTransportResource; +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; +import org.thingsboard.server.transport.coap.callback.CoapOkCallback; import org.thingsboard.server.transport.coap.CoapTransportContext; import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils;