Improve validation of Telemetry Rest Plugin calls.
This commit is contained in:
		
							parent
							
								
									3ca943ee86
								
							
						
					
					
						commit
						582ef1f63c
					
				@ -94,11 +94,7 @@ public class JsonConverter {
 | 
			
		||||
                } else if (value.isBoolean()) {
 | 
			
		||||
                    result.add(new BooleanDataEntry(valueEntry.getKey(), value.getAsBoolean()));
 | 
			
		||||
                } else if (value.isNumber()) {
 | 
			
		||||
                    if (value.getAsString().contains(".")) {
 | 
			
		||||
                        result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
 | 
			
		||||
                    } else {
 | 
			
		||||
                        result.add(new LongDataEntry(valueEntry.getKey(), value.getAsLong()));
 | 
			
		||||
                    }
 | 
			
		||||
                    parseNumericValue(result, valueEntry, value);
 | 
			
		||||
                } else {
 | 
			
		||||
                    throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
 | 
			
		||||
                }
 | 
			
		||||
@ -109,6 +105,19 @@ public class JsonConverter {
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static void parseNumericValue(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
 | 
			
		||||
        if (value.getAsString().contains(".")) {
 | 
			
		||||
            result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
                long longValue = Long.parseLong(value.getAsString());
 | 
			
		||||
                result.add(new LongDataEntry(valueEntry.getKey(), longValue));
 | 
			
		||||
            } catch (NumberFormatException e) {
 | 
			
		||||
                throw new JsonSyntaxException("Big integer values are not supported!");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static UpdateAttributesRequest convertToAttributes(JsonElement element) {
 | 
			
		||||
        return convertToAttributes(element, BasicRequest.DEFAULT_REQUEST_ID);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,31 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.extensions.api.exception;
 | 
			
		||||
 | 
			
		||||
import org.springframework.http.HttpStatus;
 | 
			
		||||
import org.springframework.http.ResponseEntity;
 | 
			
		||||
 | 
			
		||||
public class InvalidParametersException extends Exception implements ToErrorResponseEntity {
 | 
			
		||||
 | 
			
		||||
    public InvalidParametersException(String message) {
 | 
			
		||||
        super(message);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResponseEntity<String> toErrorResponseEntity() {
 | 
			
		||||
        return new ResponseEntity<>(getMessage(), HttpStatus.BAD_REQUEST);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -17,7 +17,9 @@ package org.thingsboard.server.extensions.api.exception;
 | 
			
		||||
 | 
			
		||||
import org.springframework.http.ResponseEntity;
 | 
			
		||||
 | 
			
		||||
public interface ToErrorResponseEntity {
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
 | 
			
		||||
public interface ToErrorResponseEntity extends Serializable {
 | 
			
		||||
 | 
			
		||||
    ResponseEntity<String> toErrorResponseEntity();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,36 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.extensions.api.exception;
 | 
			
		||||
 | 
			
		||||
import org.springframework.http.ResponseEntity;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
 | 
			
		||||
public class UncheckedApiException extends RuntimeException implements ToErrorResponseEntity {
 | 
			
		||||
 | 
			
		||||
    private final ToErrorResponseEntity cause;
 | 
			
		||||
 | 
			
		||||
    public <T extends Exception & ToErrorResponseEntity> UncheckedApiException(T cause) {
 | 
			
		||||
        super(cause.getMessage(), Objects.requireNonNull(cause));
 | 
			
		||||
        this.cause = cause;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResponseEntity<String> toErrorResponseEntity() {
 | 
			
		||||
        return cause.toErrorResponseEntity();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.gson.JsonParser;
 | 
			
		||||
import com.google.gson.JsonSyntaxException;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.http.HttpStatus;
 | 
			
		||||
import org.springframework.http.ResponseEntity;
 | 
			
		||||
@ -29,6 +30,9 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.*;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 | 
			
		||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.extensions.api.exception.InvalidParametersException;
 | 
			
		||||
import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
 | 
			
		||||
import org.thingsboard.server.extensions.api.exception.UncheckedApiException;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.PluginContext;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler;
 | 
			
		||||
@ -85,13 +89,13 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            ctx.loadLatestTimeseries(entityId, new PluginCallback<List<TsKvEntry>>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(PluginContext ctx, List<TsKvEntry> value) {
 | 
			
		||||
                    List<String> keys = value.stream().map(tsKv -> tsKv.getKey()).collect(Collectors.toList());
 | 
			
		||||
                    List<String> keys = value.stream().map(KvEntry::getKey).collect(Collectors.toList());
 | 
			
		||||
                    msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                    msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                    handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        } else if (feature == TelemetryFeature.ATTRIBUTES) {
 | 
			
		||||
@ -163,6 +167,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void handleHttpPostRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
 | 
			
		||||
        RestRequest request = msg.getRequest();
 | 
			
		||||
        Exception error = null;
 | 
			
		||||
        try {
 | 
			
		||||
            String[] pathParams = request.getPathParams();
 | 
			
		||||
            EntityId entityId;
 | 
			
		||||
@ -200,8 +205,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            }
 | 
			
		||||
        } catch (IOException | RuntimeException e) {
 | 
			
		||||
            log.debug("Failed to process POST request due to exception", e);
 | 
			
		||||
            error = e;
 | 
			
		||||
        }
 | 
			
		||||
        msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
 | 
			
		||||
        handleError(error, msg, HttpStatus.BAD_REQUEST);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean handleHttpPostAttributes(PluginContext ctx, PluginRestMsg msg, RestRequest request,
 | 
			
		||||
@ -210,23 +216,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                DataConstants.SHARED_SCOPE.equals(scope)) {
 | 
			
		||||
            JsonNode jsonNode = jsonMapper.readTree(request.getRequestBody());
 | 
			
		||||
            if (jsonNode.isObject()) {
 | 
			
		||||
                long ts = System.currentTimeMillis();
 | 
			
		||||
                List<AttributeKvEntry> attributes = new ArrayList<>();
 | 
			
		||||
                jsonNode.fields().forEachRemaining(entry -> {
 | 
			
		||||
                    String key = entry.getKey();
 | 
			
		||||
                    JsonNode value = entry.getValue();
 | 
			
		||||
                    if (entry.getValue().isTextual()) {
 | 
			
		||||
                        attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
 | 
			
		||||
                    } else if (entry.getValue().isBoolean()) {
 | 
			
		||||
                        attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
 | 
			
		||||
                    } else if (entry.getValue().isDouble()) {
 | 
			
		||||
                        attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
 | 
			
		||||
                    } else if (entry.getValue().isNumber()) {
 | 
			
		||||
                        attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
                if (attributes.size() > 0) {
 | 
			
		||||
                    ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), entityId, scope, attributes, new PluginCallback<Void>() {
 | 
			
		||||
                List<AttributeKvEntry> attributes = extractRequestAttributes(jsonNode);
 | 
			
		||||
                if (!attributes.isEmpty()) {
 | 
			
		||||
                    ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(IllegalArgumentException::new).getTenantId(), entityId, scope, attributes, new PluginCallback<Void>() {
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onSuccess(PluginContext ctx, Void value) {
 | 
			
		||||
                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
 | 
			
		||||
@ -236,7 +228,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                            log.error("Failed to save attributes", e);
 | 
			
		||||
                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                            handleError(e, msg, HttpStatus.BAD_REQUEST);
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                    return true;
 | 
			
		||||
@ -246,8 +238,36 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<AttributeKvEntry> extractRequestAttributes(JsonNode jsonNode) {
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        List<AttributeKvEntry> attributes = new ArrayList<>();
 | 
			
		||||
        jsonNode.fields().forEachRemaining(entry -> {
 | 
			
		||||
            String key = entry.getKey();
 | 
			
		||||
            JsonNode value = entry.getValue();
 | 
			
		||||
            if (entry.getValue().isTextual()) {
 | 
			
		||||
                attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
 | 
			
		||||
            } else if (entry.getValue().isBoolean()) {
 | 
			
		||||
                attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
 | 
			
		||||
            } else if (entry.getValue().isDouble()) {
 | 
			
		||||
                attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
 | 
			
		||||
            } else if (entry.getValue().isNumber()) {
 | 
			
		||||
                if (entry.getValue().isBigInteger()) {
 | 
			
		||||
                    throw new UncheckedApiException(new InvalidParametersException("Big integer values are not supported!"));
 | 
			
		||||
                } else {
 | 
			
		||||
                    attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        return  attributes;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handleHttpPostTimeseries(PluginContext ctx, PluginRestMsg msg, RestRequest request, EntityId entityId, long ttl) {
 | 
			
		||||
        TelemetryUploadRequest telemetryRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(request.getRequestBody()));
 | 
			
		||||
        TelemetryUploadRequest telemetryRequest;
 | 
			
		||||
        try {
 | 
			
		||||
            telemetryRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(request.getRequestBody()));
 | 
			
		||||
        } catch (JsonSyntaxException e) {
 | 
			
		||||
            throw new UncheckedApiException(new InvalidParametersException(e.getMessage()));
 | 
			
		||||
        }
 | 
			
		||||
        List<TsKvEntry> entries = new ArrayList<>();
 | 
			
		||||
        for (Map.Entry<Long, List<KvEntry>> entry : telemetryRequest.getData().entrySet()) {
 | 
			
		||||
            for (KvEntry kv : entry.getValue()) {
 | 
			
		||||
@ -264,7 +284,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                log.error("Failed to save attributes", e);
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
@ -272,6 +292,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void handleHttpDeleteRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
 | 
			
		||||
        RestRequest request = msg.getRequest();
 | 
			
		||||
        Exception error = null;
 | 
			
		||||
        try {
 | 
			
		||||
            String[] pathParams = request.getPathParams();
 | 
			
		||||
            EntityId entityId;
 | 
			
		||||
@ -293,7 +314,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                String keysParam = request.getParameter("keys");
 | 
			
		||||
                if (!StringUtils.isEmpty(keysParam)) {
 | 
			
		||||
                    String[] keys = keysParam.split(",");
 | 
			
		||||
                    ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), entityId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
 | 
			
		||||
                    ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(IllegalArgumentException::new).getTenantId(), entityId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onSuccess(PluginContext ctx, Void value) {
 | 
			
		||||
                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
 | 
			
		||||
@ -302,7 +323,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                            log.error("Failed to remove attributes", e);
 | 
			
		||||
                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                            handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                    return;
 | 
			
		||||
@ -310,8 +331,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            }
 | 
			
		||||
        } catch (RuntimeException e) {
 | 
			
		||||
            log.debug("Failed to process DELETE request due to Runtime exception", e);
 | 
			
		||||
            error = e;
 | 
			
		||||
        }
 | 
			
		||||
        msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
 | 
			
		||||
        handleError(error, msg, HttpStatus.BAD_REQUEST);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -319,14 +341,14 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
        return new PluginCallback<List<AttributeKvEntry>>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
 | 
			
		||||
                List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
 | 
			
		||||
                List<String> keys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                log.error("Failed to fetch attributes", e);
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
@ -343,7 +365,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                log.error("Failed to fetch attributes", e);
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
@ -368,8 +390,19 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(PluginContext ctx, Exception e) {
 | 
			
		||||
                log.error("Failed to fetch historical data", e);
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                handleError(e, msg, HttpStatus.INTERNAL_SERVER_ERROR);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handleError(Exception e, PluginRestMsg msg, HttpStatus defaultErrorStatus) {
 | 
			
		||||
        ResponseEntity responseEntity;
 | 
			
		||||
        if (e != null && e instanceof ToErrorResponseEntity) {
 | 
			
		||||
            responseEntity = ((ToErrorResponseEntity)e).toErrorResponseEntity();
 | 
			
		||||
        } else {
 | 
			
		||||
            responseEntity = new ResponseEntity<>(defaultErrorStatus);
 | 
			
		||||
        }
 | 
			
		||||
        msg.getResponseHolder().setResult(responseEntity);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user