Merge pull request #8387 from dashevchenko/deviceImportFix

Fixed telemetry/attribute update while device bulk import
This commit is contained in:
Andrew Shvayka 2023-04-19 13:23:27 +03:00 committed by GitHub
commit 972a967e7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 2 deletions

View File

@ -113,8 +113,9 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
ImportedEntityInfo<E> importedEntityInfo = saveEntity(entityData.getFields(), user);
E entity = importedEntityInfo.getEntity();
if (request.getMapping().getUpdate() || !importedEntityInfo.isUpdated()) {
saveKvs(user, entity, entityData.getKvs());
}
return importedEntityInfo;
},
importedEntityInfo -> {

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.controller;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -33,11 +34,13 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.OtaPackageInfo;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.SaveOtaPackageInfoRequest;
@ -68,6 +71,8 @@ import org.thingsboard.server.service.gateway_device.GatewayNotificationsService
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@ -1392,6 +1397,73 @@ public abstract class BaseDeviceControllerTest extends AbstractControllerTest {
Assert.assertEquals(savedCredentials, updatedCredentials);
}
@Test
public void testBulkImportDeviceWithUpdateFalse() throws Exception {
String deviceName = "firstDevice";
String attributeValue = "testValue";
BulkImportRequest request = new BulkImportRequest();
request.setFile(String.format("NAME,TYPE,DATA\n%s,%s,%s", deviceName, "thermostat", attributeValue));
BulkImportRequest.Mapping mapping = new BulkImportRequest.Mapping();
BulkImportRequest.ColumnMapping name = new BulkImportRequest.ColumnMapping();
name.setType(BulkImportColumnType.NAME);
BulkImportRequest.ColumnMapping type = new BulkImportRequest.ColumnMapping();
type.setType(BulkImportColumnType.TYPE);
BulkImportRequest.ColumnMapping attribute = new BulkImportRequest.ColumnMapping();
attribute.setType(BulkImportColumnType.SERVER_ATTRIBUTE);
attribute.setKey("DATA");
List<BulkImportRequest.ColumnMapping> columns = new ArrayList<>();
columns.add(name);
columns.add(type);
columns.add(attribute);
mapping.setColumns(columns);
mapping.setDelimiter(',');
mapping.setUpdate(true);
mapping.setHeader(true);
request.setMapping(mapping);
//import device
doPostWithTypedResponse("/api/device/bulk_import", request, new TypeReference<>() {});
Device savedDevice = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
//check server attribute value
List<Map<String, Object>> values = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + savedDevice.getId() +
"/values/attributes/SERVER_SCOPE", new TypeReference<>() {
});
Map<String, Object> serverAttribute = values.stream().filter(att -> att.get("key").equals("DATA")).findFirst().get();
Assert.assertEquals(attributeValue, serverAttribute.get("value"));
//update server attribute value
String newAttributeValue = "testValue2";
JsonNode content = JacksonUtil.toJsonNode("{\"DATA\": \"" + newAttributeValue + "\"}");
doPost("/api/plugins/telemetry/" + EntityType.DEVICE.name() + "/" + savedDevice.getUuidId() + "/SERVER_SCOPE", content)
.andExpect(status().isOk());
//reimport devices
String deviceName2 = "secondDevice";
String attributeValue2 = "testValue3";
request.setFile(String.format("NAME,TYPE,DATA\n%s,%s,%s\n%s,%s,%s", deviceName, "thermostat", attributeValue,
deviceName2, "thermostat", attributeValue2));
mapping.setUpdate(false);
doPostWithTypedResponse("/api/device/bulk_import", request, new TypeReference<>() {});
Device savedDevice2 = doGet("/api/tenant/devices?deviceName=" + deviceName2, Device.class);
//check attribute value was not changed after reimport
List<Map<String, Object>> values2 = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + savedDevice.getId() +
"/values/attributes/SERVER_SCOPE", new TypeReference<>() {
});
Map<String, Object> retrievedServerAttribute2 = values2.stream().filter(att -> att.get("key").equals("DATA")).findFirst().get();
Assert.assertEquals(newAttributeValue, retrievedServerAttribute2.get("value"));
//check attribute for second device
List<Map<String, Object>> values3 = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + savedDevice2.getId() +
"/values/attributes/SERVER_SCOPE", new TypeReference<>() {
});
Map<String, Object> retrievedServerAttribute3 = values3.stream().filter(att -> att.get("key").equals("DATA")).findFirst().get();
Assert.assertEquals(attributeValue2, retrievedServerAttribute3.get("value"));
}
private Device createDevice(String name) {
Device device = new Device();
device.setName(name);