created OtaPackageTransportResource

This commit is contained in:
YevhenBondarenko 2021-06-15 12:34:25 +03:00 committed by Andrew Shvayka
parent 5a0336aae2
commit 8b285c8e77
5 changed files with 149 additions and 77 deletions

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.common.msg.session;
public enum FeatureType {
ATTRIBUTES, TELEMETRY, RPC, CLAIM, PROVISION, FIRMWARE, SOFTWARE
ATTRIBUTES, TELEMETRY, RPC, CLAIM, PROVISION
}

View File

@ -30,10 +30,7 @@ public enum SessionMsgType {
SESSION_OPEN, SESSION_CLOSE,
CLAIM_REQUEST(),
GET_FIRMWARE_REQUEST,
GET_SOFTWARE_REQUEST;
CLAIM_REQUEST();
private final boolean requiresRulesProcessing;

View File

@ -44,7 +44,6 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
@ -139,10 +138,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
processExchangeGetRequest(exchange, featureType.get());
} else if (featureType.get() == FeatureType.ATTRIBUTES) {
processRequest(exchange, SessionMsgType.GET_ATTRIBUTES_REQUEST);
} else if (featureType.get() == FeatureType.FIRMWARE) {
processRequest(exchange, SessionMsgType.GET_FIRMWARE_REQUEST);
} else if (featureType.get() == FeatureType.SOFTWARE) {
processRequest(exchange, SessionMsgType.GET_SOFTWARE_REQUEST);
} else {
log.trace("Invalid feature type parameter");
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
@ -349,12 +344,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
coapTransportAdaptor.convertToGetAttributes(sessionId, request),
new CoapNoOpCallback(exchange));
break;
case GET_FIRMWARE_REQUEST:
getOtaPackageCallback(sessionInfo, exchange, OtaPackageType.FIRMWARE);
break;
case GET_SOFTWARE_REQUEST:
getOtaPackageCallback(sessionInfo, exchange, OtaPackageType.SOFTWARE);
break;
}
} catch (AdaptorException e) {
log.trace("[{}] Failed to decode message: ", sessionId, e);
@ -366,16 +355,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
}
private void getOtaPackageCallback(TransportProtos.SessionInfoProto sessionInfo, CoapExchange exchange, OtaPackageType firmwareType) {
TransportProtos.GetOtaPackageRequestMsg requestMsg = TransportProtos.GetOtaPackageRequestMsg.newBuilder()
.setTenantIdMSB(sessionInfo.getTenantIdMSB())
.setTenantIdLSB(sessionInfo.getTenantIdLSB())
.setDeviceIdMSB(sessionInfo.getDeviceIdMSB())
.setDeviceIdLSB(sessionInfo.getDeviceIdLSB())
.setType(firmwareType.name()).build();
transportContext.getTransportService().process(sessionInfo, requestMsg, new OtaPackageCallback(exchange));
}
private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) {
tokenToObserveNotificationSeqMap.remove(token);
return tokenToSessionInfoMap.remove(token);
@ -470,57 +449,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
}
}
private class OtaPackageCallback implements TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> {
private final CoapExchange exchange;
OtaPackageCallback(CoapExchange exchange) {
this.exchange = exchange;
}
@Override
public void onSuccess(TransportProtos.GetOtaPackageResponseMsg msg) {
String title = exchange.getQueryParameter("title");
String version = exchange.getQueryParameter("version");
if (msg.getResponseStatus().equals(TransportProtos.ResponseStatus.SUCCESS)) {
String firmwareId = new UUID(msg.getOtaPackageIdMSB(), msg.getOtaPackageIdLSB()).toString();
if (msg.getTitle().equals(title) && msg.getVersion().equals(version)) {
String strChunkSize = exchange.getQueryParameter("size");
String strChunk = exchange.getQueryParameter("chunk");
int chunkSize = StringUtils.isEmpty(strChunkSize) ? 0 : Integer.parseInt(strChunkSize);
int chunk = StringUtils.isEmpty(strChunk) ? 0 : Integer.parseInt(strChunk);
exchange.respond(CoAP.ResponseCode.CONTENT, transportContext.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk));
}
else if (firmwareId != null) {
sendOtaData(exchange, firmwareId);
} else {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}
} else {
exchange.respond(CoAP.ResponseCode.NOT_FOUND);
}
}
@Override
public void onError(Throwable e) {
log.warn("Failed to process request", e);
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
}
}
private void sendOtaData(CoapExchange exchange, String firmwareId) {
Response response = new Response(CoAP.ResponseCode.CONTENT);
byte[] fwData = transportContext.getOtaPackageDataCache().get(firmwareId);
if (fwData != null && fwData.length > 0) {
response.setPayload(fwData);
if (exchange.getRequestOptions().getBlock2() != null) {
int chunkSize = exchange.getRequestOptions().getBlock2().getSzx();
boolean moreFlag = fwData.length > chunkSize;
response.getOptions().setBlock2(chunkSize, moreFlag, 0);
}
exchange.respond(response);
}
}
private static class CoapSessionListener implements SessionMsgListener {
private final CoapTransportResource coapTransportResource;

View File

@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.coapserver.CoapServerService;
import org.thingsboard.server.coapserver.TbCoapServerComponent;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.coap.efento.CoapEfentoTransportResource;
import javax.annotation.PostConstruct;
@ -59,6 +60,8 @@ public class CoapTransportService implements TbTransportService {
efento.add(efentoMeasurementsTransportResource);
coapServer.add(api);
coapServer.add(efento);
coapServer.add(new OtaPackageTransportResource(coapTransportContext, OtaPackageType.FIRMWARE));
coapServer.add(new OtaPackageTransportResource(coapTransportContext, OtaPackageType.SOFTWARE));
log.info("CoAP transport started!");
}

View File

@ -0,0 +1,144 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.coap;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Slf4j
public class OtaPackageTransportResource extends AbstractCoapTransportResource {
private static final int ACCESS_TOKEN_POSITION = 2;
private final OtaPackageType otaPackageType;
public OtaPackageTransportResource(CoapTransportContext ctx, OtaPackageType otaPackageType) {
super(ctx, otaPackageType.getKeyPrefix());
this.otaPackageType = otaPackageType;
}
@Override
protected void processHandleGet(CoapExchange exchange) {
log.trace("Processing {}", exchange.advanced().getRequest());
exchange.accept();
Exchange advanced = exchange.advanced();
Request request = advanced.getRequest();
processAccessTokenRequest(exchange, request);
}
@Override
protected void processHandlePost(CoapExchange exchange) {
exchange.respond(CoAP.ResponseCode.METHOD_NOT_ALLOWED);
}
private void processAccessTokenRequest(CoapExchange exchange, Request request) {
Optional<DeviceTokenCredentials> credentials = decodeCredentials(request);
if (credentials.isEmpty()) {
exchange.respond(CoAP.ResponseCode.UNAUTHORIZED);
return;
}
transportService.process(DeviceTransportType.COAP, TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),
new CoapDeviceAuthCallback(transportContext, exchange, (sessionInfo, deviceProfile) -> {
getOtaPackageCallback(sessionInfo, exchange, otaPackageType);
}));
}
private void getOtaPackageCallback(TransportProtos.SessionInfoProto sessionInfo, CoapExchange exchange, OtaPackageType firmwareType) {
TransportProtos.GetOtaPackageRequestMsg requestMsg = TransportProtos.GetOtaPackageRequestMsg.newBuilder()
.setTenantIdMSB(sessionInfo.getTenantIdMSB())
.setTenantIdLSB(sessionInfo.getTenantIdLSB())
.setDeviceIdMSB(sessionInfo.getDeviceIdMSB())
.setDeviceIdLSB(sessionInfo.getDeviceIdLSB())
.setType(firmwareType.name()).build();
transportContext.getTransportService().process(sessionInfo, requestMsg, new OtaPackageCallback(exchange));
}
private Optional<DeviceTokenCredentials> decodeCredentials(Request request) {
List<String> uriPath = request.getOptions().getUriPath();
if (uriPath.size() == ACCESS_TOKEN_POSITION) {
return Optional.of(new DeviceTokenCredentials(uriPath.get(ACCESS_TOKEN_POSITION - 1)));
} else {
return Optional.empty();
}
}
@Override
public Resource getChild(String name) {
return this;
}
private class OtaPackageCallback implements TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> {
private final CoapExchange exchange;
OtaPackageCallback(CoapExchange exchange) {
this.exchange = exchange;
}
@Override
public void onSuccess(TransportProtos.GetOtaPackageResponseMsg msg) {
String title = exchange.getQueryParameter("title");
String version = exchange.getQueryParameter("version");
if (msg.getResponseStatus().equals(TransportProtos.ResponseStatus.SUCCESS)) {
String firmwareId = new UUID(msg.getOtaPackageIdMSB(), msg.getOtaPackageIdLSB()).toString();
if ((title == null || msg.getTitle().equals(title)) && (version == null || msg.getVersion().equals(version))) {
String strChunkSize = exchange.getQueryParameter("size");
String strChunk = exchange.getQueryParameter("chunk");
int chunkSize = StringUtils.isEmpty(strChunkSize) ? 0 : Integer.parseInt(strChunkSize);
int chunk = StringUtils.isEmpty(strChunk) ? 0 : Integer.parseInt(strChunk);
respondOtaPackage(exchange, transportContext.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk));
} else {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}
} else {
exchange.respond(CoAP.ResponseCode.NOT_FOUND);
}
}
@Override
public void onError(Throwable e) {
log.warn("Failed to process request", e);
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
}
}
private void respondOtaPackage(CoapExchange exchange, byte[] data) {
Response response = new Response(CoAP.ResponseCode.CONTENT);
if (data != null && data.length > 0) {
response.setPayload(data);
if (exchange.getRequestOptions().getBlock2() != null) {
int chunkSize = exchange.getRequestOptions().getBlock2().getSzx();
boolean moreFlag = data.length > chunkSize;
response.getOptions().setBlock2(chunkSize, moreFlag, 0);
}
exchange.respond(response);
}
}
}