From 5c8618c3ccdf49f47ec2ecc07893955320a7c719 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 22 Jul 2021 09:52:42 +0300 Subject: [PATCH] added content format for the CoAP responses --- .../coap/adaptors/CoapTransportAdaptor.java | 1 + .../transport/coap/adaptors/JsonCoapAdaptor.java | 5 +++++ .../transport/coap/adaptors/ProtoCoapAdaptor.java | 5 +++++ .../callback/AbstractSyncSessionCallback.java | 9 +++++++++ .../GetAttributesSyncSessionCallback.java | 2 +- .../callback/ToServerRpcSyncSessionCallback.java | 2 +- .../coap/client/DefaultCoapClientContext.java | 15 ++++++++++++--- .../transport/coap/client/TbCoapClientState.java | 7 ++----- .../lwm2m/server/LwM2mTransportCoapResource.java | 14 +++++++------- 9 files changed, 43 insertions(+), 17 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java index 00374f6916..23cfa83b44 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java @@ -49,4 +49,5 @@ public interface CoapTransportAdaptor { ProvisionDeviceRequestMsg convertToProvisionRequestMsg(UUID sessionId, Request inbound) throws AdaptorException; + int getContentFormat(); } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index a1ef8ba205..ef37746789 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; @@ -164,4 +165,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { return payload; } + @Override + public int getContentFormat() { + return MediaTypeRegistry.APPLICATION_JSON; + } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java index 93d9a35029..1c8f84fbd0 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java @@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; @@ -162,4 +163,8 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage); } + @Override + public int getContentFormat() { + return MediaTypeRegistry.APPLICATION_OCTET_STREAM; + } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java index b755b7097e..7c23546ef2 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java @@ -17,7 +17,9 @@ package org.thingsboard.server.transport.coap.callback; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Request; +import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.CoapExchange; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.gen.transport.TransportProtos; @@ -71,4 +73,11 @@ public abstract class AbstractSyncSessionCallback implements SessionMsgListener } } + protected void respond(Response response) { + int contentFormat = exchange.getRequestOptions().getContentFormat(); + contentFormat = contentFormat != MediaTypeRegistry.UNDEFINED ? contentFormat : state.getContentFormat(); + response.getOptions().setContentFormat(contentFormat); + exchange.respond(response); + } + } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java index 9c05098ce4..d79e3299ee 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java @@ -34,7 +34,7 @@ public class GetAttributesSyncSessionCallback extends AbstractSyncSessionCallbac @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { try { - exchange.respond(state.getAdaptor().convertToPublish(AbstractSyncSessionCallback.isConRequest(state.getAttrs()), msg)); + respond(state.getAdaptor().convertToPublish(isConRequest(state.getAttrs()), msg)); } catch (AdaptorException e) { log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e); exchange.respond(new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java index b51758b51e..16feb81668 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java @@ -33,7 +33,7 @@ public class ToServerRpcSyncSessionCallback extends AbstractSyncSessionCallback @Override public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { try { - exchange.respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse)); + respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index b5495042cf..d61ba4449b 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.client; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; @@ -330,6 +331,7 @@ public class DefaultCoapClientContext implements CoapClientContext { if (state.getConfiguration() == null || state.getAdaptor() == null) { state.setConfiguration(getTransportConfigurationContainer(deviceProfile)); state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload())); + state.setContentFormat(state.getAdaptor().getContentFormat()); } if (state.getCredentials() == null) { state.init(deviceCredentials); @@ -409,7 +411,7 @@ public class DefaultCoapClientContext implements CoapClientContext { try { boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs()); Response response = state.getAdaptor().convertToPublish(conRequest, msg); - attrs.getExchange().respond(response); + respond(attrs.getExchange(), response, state.getContentFormat()); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); cancelObserveRelation(attrs); @@ -440,10 +442,10 @@ public class DefaultCoapClientContext implements CoapClientContext { int requestId = getNextMsgId(); Response response = state.getAdaptor().convertToPublish(conRequest, msg); response.setMID(requestId); - attrs.getExchange().respond(response); if (conRequest) { response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); } + respond(attrs.getExchange(), response, state.getContentFormat()); } catch (AdaptorException e) { log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e); cancelObserveRelation(attrs); @@ -503,7 +505,7 @@ public class DefaultCoapClientContext implements CoapClientContext { if (conRequest) { response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); } - state.getRpc().getExchange().respond(response); + respond(state.getRpc().getExchange(), response, state.getContentFormat()); sent = true; } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); @@ -705,4 +707,11 @@ public class DefaultCoapClientContext implements CoapClientContext { state.setAdaptor(null); //TODO: add optimistic lock check that the client was already deleted and cleanup "clients" map. } + + private void respond(CoapExchange exchange, Response response, int defContentFormat) { + int contentFormat = exchange.getRequestOptions().getContentFormat(); + contentFormat = contentFormat != MediaTypeRegistry.UNDEFINED ? contentFormat : defContentFormat; + response.getOptions().setContentFormat(contentFormat); + exchange.respond(response); + } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java index 7613dc2ece..f106f57961 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java @@ -18,26 +18,21 @@ package org.thingsboard.server.transport.coap.client; import lombok.Data; import lombok.Getter; import lombok.Setter; -import org.eclipse.leshan.server.registration.Registration; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.PowerMode; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.TransportConfigurationContainer; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -55,6 +50,8 @@ public class TbCoapClientState { private volatile DefaultCoapClientContext.CoapSessionListener listener; private volatile TbCoapObservationState attrs; private volatile TbCoapObservationState rpc; + private volatile int contentFormat; + private TransportProtos.AttributeUpdateNotificationMsg missedAttributeUpdates; private DeviceProfileId profileId; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java index bbeda58270..bde1725807 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java @@ -137,17 +137,17 @@ public class LwM2mTransportCoapResource extends AbstractLwM2mTransportResource { ); UUID currentId = UUID.fromString(idStr); Response response = new Response(CoAP.ResponseCode.CONTENT); - byte[] fwData = this.getOtaData(currentId); - log.debug("Read softWare data (length): [{}]", fwData.length); - if (fwData != null && fwData.length > 0) { - response.setPayload(fwData); + byte[] otaData = this.getOtaData(currentId); + log.debug("Read ota data (length): [{}]", otaData.length); + if (otaData.length > 0) { + response.setPayload(otaData); if (exchange.getRequestOptions().getBlock2() != null) { int chunkSize = exchange.getRequestOptions().getBlock2().getSzx(); - boolean lastFlag = fwData.length <= chunkSize; + boolean lastFlag = otaData.length <= chunkSize; response.getOptions().setBlock2(chunkSize, lastFlag, 0); - log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), fwData.length, chunkSize, lastFlag); + log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), otaData.length, chunkSize, lastFlag); } else { - log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), fwData.length); + log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), otaData.length); } exchange.respond(response); }