added json timeseries type for bulk import
This commit is contained in:
parent
320e0b674e
commit
7be97f379c
@ -17,6 +17,7 @@ package org.thingsboard.server.service.sync.ie.importing.csv;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.gson.JsonElement;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import com.google.gson.JsonPrimitive;
|
import com.google.gson.JsonPrimitive;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
@ -183,7 +184,13 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
|
|||||||
data.entrySet().stream()
|
data.entrySet().stream()
|
||||||
.filter(dataEntry -> dataEntry.getKey().getType() == kvType &&
|
.filter(dataEntry -> dataEntry.getKey().getType() == kvType &&
|
||||||
StringUtils.isNotEmpty(dataEntry.getKey().getKey()))
|
StringUtils.isNotEmpty(dataEntry.getKey().getKey()))
|
||||||
.forEach(dataEntry -> kvs.add(dataEntry.getKey().getKey(), dataEntry.getValue().toJsonPrimitive()));
|
.forEach(dataEntry -> {
|
||||||
|
ParsedValue value = dataEntry.getValue();
|
||||||
|
JsonElement kvValue = (value.getDataType() == DataType.JSON)
|
||||||
|
? (JsonElement) value.getValue()
|
||||||
|
: value.toJsonPrimitive();
|
||||||
|
kvs.add(dataEntry.getKey().getKey(), kvValue);
|
||||||
|
});
|
||||||
return Map.entry(kvType, kvs);
|
return Map.entry(kvType, kvs);
|
||||||
})
|
})
|
||||||
.filter(kvsEntry -> kvsEntry.getValue().entrySet().size() > 0)
|
.filter(kvsEntry -> kvsEntry.getValue().entrySet().size() > 0)
|
||||||
|
|||||||
@ -17,10 +17,15 @@ package org.thingsboard.server.utils;
|
|||||||
|
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import org.apache.commons.csv.CSVFormat;
|
import org.apache.commons.csv.CSVFormat;
|
||||||
|
import org.apache.commons.csv.CSVPrinter;
|
||||||
import org.apache.commons.csv.CSVRecord;
|
import org.apache.commons.csv.CSVRecord;
|
||||||
import org.apache.commons.io.input.CharSequenceReader;
|
import org.apache.commons.io.input.CharSequenceReader;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
@ -43,4 +48,18 @@ public class CsvUtils {
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
public static byte[] generateCsv(List<List<String>> rows) {
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
try (OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
|
||||||
|
CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) {
|
||||||
|
|
||||||
|
for (List<String> row : rows) {
|
||||||
|
csvPrinter.printRecord(row);
|
||||||
|
}
|
||||||
|
csvPrinter.flush();
|
||||||
|
}
|
||||||
|
return out.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -77,8 +77,13 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
|
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
|
||||||
import org.thingsboard.server.service.state.DeviceStateService;
|
import org.thingsboard.server.service.state.DeviceStateService;
|
||||||
|
import org.thingsboard.server.utils.CsvUtils;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -1586,6 +1591,61 @@ public class DeviceControllerTest extends AbstractControllerTest {
|
|||||||
Assert.assertEquals(newAttributeValue, actualAttribute.get("value"));
|
Assert.assertEquals(newAttributeValue, actualAttribute.get("value"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkImportDeviceWithJsonAttr() throws Exception {
|
||||||
|
String deviceName = "some_device";
|
||||||
|
String deviceType = "some_type";
|
||||||
|
JsonNode deviceAttr = JacksonUtil.toJsonNode("{\"threshold\": 45}");
|
||||||
|
|
||||||
|
List<List<String>> content = new LinkedList<>();
|
||||||
|
content.add(Arrays.asList("NAME", "TYPE", "ATTR"));
|
||||||
|
content.add(Arrays.asList(deviceName, deviceType, deviceAttr.toString()));
|
||||||
|
|
||||||
|
byte[] bytes = CsvUtils.generateCsv(content);
|
||||||
|
BulkImportRequest request = new BulkImportRequest();
|
||||||
|
request.setFile(new String(bytes, StandardCharsets.UTF_8));
|
||||||
|
BulkImportRequest.Mapping mapping = new BulkImportRequest.Mapping();
|
||||||
|
BulkImportRequest.ColumnMapping name = new BulkImportRequest.ColumnMapping();
|
||||||
|
name.setType(BulkImportColumnType.NAME);
|
||||||
|
BulkImportRequest.ColumnMapping type = new BulkImportRequest.ColumnMapping();
|
||||||
|
type.setType(BulkImportColumnType.TYPE);
|
||||||
|
BulkImportRequest.ColumnMapping attr = new BulkImportRequest.ColumnMapping();
|
||||||
|
attr.setType(BulkImportColumnType.SERVER_ATTRIBUTE);
|
||||||
|
attr.setKey("attr");
|
||||||
|
List<BulkImportRequest.ColumnMapping> columns = new ArrayList<>();
|
||||||
|
columns.add(name);
|
||||||
|
columns.add(type);
|
||||||
|
columns.add(attr);
|
||||||
|
|
||||||
|
mapping.setColumns(columns);
|
||||||
|
mapping.setDelimiter(',');
|
||||||
|
mapping.setUpdate(true);
|
||||||
|
mapping.setHeader(true);
|
||||||
|
request.setMapping(mapping);
|
||||||
|
|
||||||
|
BulkImportResult<Device> deviceBulkImportResult = doPostWithTypedResponse("/api/device/bulk_import", request, new TypeReference<>() {});
|
||||||
|
|
||||||
|
Assert.assertEquals(1, deviceBulkImportResult.getCreated().get());
|
||||||
|
Assert.assertEquals(0, deviceBulkImportResult.getErrors().get());
|
||||||
|
Assert.assertEquals(0, deviceBulkImportResult.getUpdated().get());
|
||||||
|
Assert.assertTrue(deviceBulkImportResult.getErrorsList().isEmpty());
|
||||||
|
|
||||||
|
Device savedDevice = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
|
||||||
|
|
||||||
|
Assert.assertNotNull(savedDevice);
|
||||||
|
Assert.assertEquals(deviceName, savedDevice.getName());
|
||||||
|
Assert.assertEquals(deviceType, savedDevice.getType());
|
||||||
|
|
||||||
|
//check server attribute value
|
||||||
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
Map<String, Object> actualAttribute = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + savedDevice.getId() +
|
||||||
|
"/values/attributes/SERVER_SCOPE", new TypeReference<List<Map<String, Object>>>() {}).stream()
|
||||||
|
.filter(att -> att.get("key").equals("attr")).findFirst().get();
|
||||||
|
LinkedHashMap<String, Object> expected = JacksonUtil.convertValue(deviceAttr, new TypeReference<>() {});
|
||||||
|
Assert.assertEquals(expected, actualAttribute.get("value"));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSaveDeviceWithOutdatedVersion() throws Exception {
|
public void testSaveDeviceWithOutdatedVersion() throws Exception {
|
||||||
Device device = createDevice("Device v1.0");
|
Device device = createDevice("Device v1.0");
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.data.util;
|
package org.thingsboard.server.common.data.util;
|
||||||
|
|
||||||
|
import com.google.gson.JsonParser;
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.thingsboard.server.common.data.kv.DataType;
|
import org.thingsboard.server.common.data.kv.DataType;
|
||||||
@ -40,6 +41,11 @@ public class TypeCastUtil {
|
|||||||
} catch (RuntimeException ignored) {}
|
} catch (RuntimeException ignored) {}
|
||||||
} else if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
|
} else if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
|
||||||
return Pair.of(DataType.BOOLEAN, Boolean.parseBoolean(value));
|
return Pair.of(DataType.BOOLEAN, Boolean.parseBoolean(value));
|
||||||
|
} else if (looksLikeJson(value)) {
|
||||||
|
try {
|
||||||
|
return Pair.of(DataType.JSON, JsonParser.parseString(value));
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Pair.of(DataType.STRING, value);
|
return Pair.of(DataType.STRING, value);
|
||||||
}
|
}
|
||||||
@ -70,4 +76,9 @@ public class TypeCastUtil {
|
|||||||
return valueAsString.contains(".") && !valueAsString.contains("E") && !valueAsString.contains("e");
|
return valueAsString.contains(".") && !valueAsString.contains("E") && !valueAsString.contains("e");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean looksLikeJson(String value) {
|
||||||
|
return (value.startsWith("{") && value.endsWith("}")) ||
|
||||||
|
(value.startsWith("[") && value.endsWith("]"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user