added content format for the CoAP responses

This commit is contained in:
YevhenBondarenko 2021-07-22 09:52:42 +03:00 committed by Andrew Shvayka
parent 1636c54c2f
commit 5c8618c3cc
9 changed files with 43 additions and 17 deletions

View File

@ -49,4 +49,5 @@ public interface CoapTransportAdaptor {
ProvisionDeviceRequestMsg convertToProvisionRequestMsg(UUID sessionId, Request inbound) throws AdaptorException;
int getContentFormat();
}

View File

@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.springframework.stereotype.Component;
@ -164,4 +165,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
return payload;
}
@Override
public int getContentFormat() {
return MediaTypeRegistry.APPLICATION_JSON;
}
}

View File

@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.springframework.stereotype.Component;
@ -162,4 +163,8 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
@Override
public int getContentFormat() {
return MediaTypeRegistry.APPLICATION_OCTET_STREAM;
}
}

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.transport.coap.callback;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -71,4 +73,11 @@ public abstract class AbstractSyncSessionCallback implements SessionMsgListener
}
}
protected void respond(Response response) {
int contentFormat = exchange.getRequestOptions().getContentFormat();
contentFormat = contentFormat != MediaTypeRegistry.UNDEFINED ? contentFormat : state.getContentFormat();
response.getOptions().setContentFormat(contentFormat);
exchange.respond(response);
}
}

View File

@ -34,7 +34,7 @@ public class GetAttributesSyncSessionCallback extends AbstractSyncSessionCallbac
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
try {
exchange.respond(state.getAdaptor().convertToPublish(AbstractSyncSessionCallback.isConRequest(state.getAttrs()), msg));
respond(state.getAdaptor().convertToPublish(isConRequest(state.getAttrs()), msg));
} catch (AdaptorException e) {
log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
exchange.respond(new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR));

View File

@ -33,7 +33,7 @@ public class ToServerRpcSyncSessionCallback extends AbstractSyncSessionCallback
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
try {
exchange.respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse));
respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse));
} catch (AdaptorException e) {
log.trace("Failed to reply due to error", e);
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap.client;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.core.server.resources.CoapExchange;
@ -330,6 +331,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
if (state.getConfiguration() == null || state.getAdaptor() == null) {
state.setConfiguration(getTransportConfigurationContainer(deviceProfile));
state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload()));
state.setContentFormat(state.getAdaptor().getContentFormat());
}
if (state.getCredentials() == null) {
state.init(deviceCredentials);
@ -409,7 +411,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
try {
boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
Response response = state.getAdaptor().convertToPublish(conRequest, msg);
attrs.getExchange().respond(response);
respond(attrs.getExchange(), response, state.getContentFormat());
} catch (AdaptorException e) {
log.trace("Failed to reply due to error", e);
cancelObserveRelation(attrs);
@ -440,10 +442,10 @@ public class DefaultCoapClientContext implements CoapClientContext {
int requestId = getNextMsgId();
Response response = state.getAdaptor().convertToPublish(conRequest, msg);
response.setMID(requestId);
attrs.getExchange().respond(response);
if (conRequest) {
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
}
respond(attrs.getExchange(), response, state.getContentFormat());
} catch (AdaptorException e) {
log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
cancelObserveRelation(attrs);
@ -503,7 +505,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
if (conRequest) {
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
}
state.getRpc().getExchange().respond(response);
respond(state.getRpc().getExchange(), response, state.getContentFormat());
sent = true;
} catch (AdaptorException e) {
log.trace("Failed to reply due to error", e);
@ -705,4 +707,11 @@ public class DefaultCoapClientContext implements CoapClientContext {
state.setAdaptor(null);
//TODO: add optimistic lock check that the client was already deleted and cleanup "clients" map.
}
private void respond(CoapExchange exchange, Response response, int defContentFormat) {
int contentFormat = exchange.getRequestOptions().getContentFormat();
contentFormat = contentFormat != MediaTypeRegistry.UNDEFINED ? contentFormat : defContentFormat;
response.getOptions().setContentFormat(contentFormat);
exchange.respond(response);
}
}

View File

@ -18,26 +18,21 @@ package org.thingsboard.server.transport.coap.client;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.eclipse.leshan.server.registration.Registration;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -55,6 +50,8 @@ public class TbCoapClientState {
private volatile DefaultCoapClientContext.CoapSessionListener listener;
private volatile TbCoapObservationState attrs;
private volatile TbCoapObservationState rpc;
private volatile int contentFormat;
private TransportProtos.AttributeUpdateNotificationMsg missedAttributeUpdates;
private DeviceProfileId profileId;

View File

@ -137,17 +137,17 @@ public class LwM2mTransportCoapResource extends AbstractLwM2mTransportResource {
);
UUID currentId = UUID.fromString(idStr);
Response response = new Response(CoAP.ResponseCode.CONTENT);
byte[] fwData = this.getOtaData(currentId);
log.debug("Read softWare data (length): [{}]", fwData.length);
if (fwData != null && fwData.length > 0) {
response.setPayload(fwData);
byte[] otaData = this.getOtaData(currentId);
log.debug("Read ota data (length): [{}]", otaData.length);
if (otaData.length > 0) {
response.setPayload(otaData);
if (exchange.getRequestOptions().getBlock2() != null) {
int chunkSize = exchange.getRequestOptions().getBlock2().getSzx();
boolean lastFlag = fwData.length <= chunkSize;
boolean lastFlag = otaData.length <= chunkSize;
response.getOptions().setBlock2(chunkSize, lastFlag, 0);
log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), fwData.length, chunkSize, lastFlag);
log.trace("With block2 Send currentId: [{}], length: [{}], chunkSize [{}], moreFlag [{}]", currentId.toString(), otaData.length, chunkSize, lastFlag);
} else {
log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), fwData.length);
log.trace("With block1 Send currentId: [{}], length: [{}], ", currentId.toString(), otaData.length);
}
exchange.respond(response);
}