Fix no type-cast for attributes; telemetry controller refactoring

This commit is contained in:
VIacheslavKlimov 2025-07-02 12:46:50 +03:00
parent 097b9d7148
commit fb966bded0
9 changed files with 63 additions and 108 deletions

View File

@ -33,11 +33,11 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -65,25 +65,17 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.InvalidParametersException;
import org.thingsboard.server.exception.UncheckedApiException;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
@ -156,9 +148,6 @@ public class TelemetryController extends BaseController {
@Autowired
private TbTelemetryService tbTelemetryService;
@Value("${transport.json.max_string_value_length:0}")
private int maxStringValueLength;
private ExecutorService executor;
@PostConstruct
@ -314,10 +303,10 @@ public class TelemetryController extends BaseController {
@Parameter(description = "A string value representing the timezone that will be used to calculate exact timestamps for 'WEEK', 'WEEK_ISO', 'MONTH' and 'QUARTER' interval types.")
@RequestParam(name = "timeZone", required = false) String timeZone,
@Parameter(description = "An integer value that represents a max number of time series data points to fetch." +
" This parameter is used only in the case if 'agg' parameter is set to 'NONE'.", schema = @Schema(defaultValue = "100"))
" This parameter is used only in the case if 'agg' parameter is set to 'NONE'.", schema = @Schema(defaultValue = "100"))
@RequestParam(name = "limit", defaultValue = "100") Integer limit,
@Parameter(description = "A string value representing the aggregation function. " +
"If the interval is not specified, 'agg' parameter will use 'NONE' value.",
"If the interval is not specified, 'agg' parameter will use 'NONE' value.",
schema = @Schema(allowableValues = {"MIN", "MAX", "AVG", "SUM", "COUNT", "NONE"}))
@RequestParam(name = "agg", defaultValue = "NONE") String aggStr,
@Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"}))
@ -337,20 +326,21 @@ public class TelemetryController extends BaseController {
+ TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = SAVE_ATTIRIBUTES_STATUS_OK +
"Platform creates an audit log event about device attributes updates with action type 'ATTRIBUTES_UPDATED', " +
"and also sends event msg to the rule engine with msg type 'ATTRIBUTES_UPDATED'."),
"Platform creates an audit log event about device attributes updates with action type 'ATTRIBUTES_UPDATED', " +
"and also sends event msg to the rule engine with msg type 'ATTRIBUTES_UPDATED'."),
@ApiResponse(responseCode = "400", description = SAVE_ATTIRIBUTES_STATUS_BAD_REQUEST),
@ApiResponse(responseCode = "401", description = "User is not authorized to save device attributes for selected device. Most likely, User belongs to different Customer or Tenant."),
@ApiResponse(responseCode = "500", description = "The exception was thrown during processing the request. " +
"Platform creates an audit log event about device attributes updates with action type 'ATTRIBUTES_UPDATED' that includes an error stacktrace."),
"Platform creates an audit log event about device attributes updates with action type 'ATTRIBUTES_UPDATED' that includes an error stacktrace."),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveDeviceAttributes(
@Parameter(description = DEVICE_ID_PARAM_DESCRIPTION, required = true) @PathVariable("deviceId") String deviceIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"}, requiredMode = Schema.RequiredMode.REQUIRED)) @PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true) @RequestBody JsonNode request) throws ThingsboardException {
@PostMapping(value = "/{deviceId}/{scope}")
public DeferredResult<ResponseEntity> saveDeviceAttributes(@Parameter(description = DEVICE_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("deviceId") String deviceIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"}, requiredMode = Schema.RequiredMode.REQUIRED))
@PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true)
@RequestBody String request) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
return saveAttributes(getTenantId(), entityId, scope, request);
}
@ -367,13 +357,15 @@ public class TelemetryController extends BaseController {
@ApiResponse(responseCode = "500", description = SAVE_ENTITY_ATTRIBUTES_STATUS_INTERNAL_SERVER_ERROR),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveEntityAttributesV1(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE")) @PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"})) @PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true) @RequestBody JsonNode request) throws ThingsboardException {
@PostMapping(value = "/{entityType}/{entityId}/{scope}")
public DeferredResult<ResponseEntity> saveEntityAttributesV1(@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE"))
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"}))
@PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true)
@RequestBody String request) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
return saveAttributes(getTenantId(), entityId, scope, request);
}
@ -390,13 +382,15 @@ public class TelemetryController extends BaseController {
@ApiResponse(responseCode = "500", description = SAVE_ENTITY_ATTRIBUTES_STATUS_INTERNAL_SERVER_ERROR),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/attributes/{scope}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveEntityAttributesV2(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE")) @PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"}, requiredMode = Schema.RequiredMode.REQUIRED)) @PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true) @RequestBody JsonNode request) throws ThingsboardException {
@PostMapping(value = "/{entityType}/{entityId}/attributes/{scope}")
public DeferredResult<ResponseEntity> saveEntityAttributesV2(@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE"))
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = ATTRIBUTES_SCOPE_DESCRIPTION, schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE"}, requiredMode = Schema.RequiredMode.REQUIRED))
@PathVariable("scope") AttributeScope scope,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = ATTRIBUTES_JSON_REQUEST_DESCRIPTION, required = true)
@RequestBody String request) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
return saveAttributes(getTenantId(), entityId, scope, request);
}
@ -460,11 +454,11 @@ public class TelemetryController extends BaseController {
TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Time series for the selected keys in the request was removed. " +
"Platform creates an audit log event about entity time series removal with action type 'TIMESERIES_DELETED'."),
"Platform creates an audit log event about entity time series removal with action type 'TIMESERIES_DELETED'."),
@ApiResponse(responseCode = "400", description = "Platform returns a bad request in case if keys list is empty or start and end timestamp values is empty when deleteAllDataForKeys is set to false."),
@ApiResponse(responseCode = "401", description = "User is not authorized to delete entity time series for selected entity. Most likely, User belongs to different Customer or Tenant."),
@ApiResponse(responseCode = "500", description = "The exception was thrown during processing the request. " +
"Platform creates an audit log event about entity time series removal with action type 'TIMESERIES_DELETED' that includes an error stacktrace."),
"Platform creates an audit log event about entity time series removal with action type 'TIMESERIES_DELETED' that includes an error stacktrace."),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE)
@ -541,11 +535,11 @@ public class TelemetryController extends BaseController {
"Referencing a non-existing Device Id will cause an error" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Device attributes was removed for the selected keys in the request. " +
"Platform creates an audit log event about device attributes removal with action type 'ATTRIBUTES_DELETED'."),
"Platform creates an audit log event about device attributes removal with action type 'ATTRIBUTES_DELETED'."),
@ApiResponse(responseCode = "400", description = "Platform returns a bad request in case if keys or scope are not specified."),
@ApiResponse(responseCode = "401", description = "User is not authorized to delete device attributes for selected entity. Most likely, User belongs to different Customer or Tenant."),
@ApiResponse(responseCode = "500", description = "The exception was thrown during processing the request. " +
"Platform creates an audit log event about device attributes removal with action type 'ATTRIBUTES_DELETED' that includes an error stacktrace."),
"Platform creates an audit log event about device attributes removal with action type 'ATTRIBUTES_DELETED' that includes an error stacktrace."),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
@ -563,11 +557,11 @@ public class TelemetryController extends BaseController {
INVALID_ENTITY_ID_OR_ENTITY_TYPE_DESCRIPTION + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Entity attributes was removed for the selected keys in the request. " +
"Platform creates an audit log event about entity attributes removal with action type 'ATTRIBUTES_DELETED'."),
"Platform creates an audit log event about entity attributes removal with action type 'ATTRIBUTES_DELETED'."),
@ApiResponse(responseCode = "400", description = "Platform returns a bad request in case if keys or scope are not specified."),
@ApiResponse(responseCode = "401", description = "User is not authorized to delete entity attributes for selected entity. Most likely, User belongs to different Customer or Tenant."),
@ApiResponse(responseCode = "500", description = "The exception was thrown during processing the request. " +
"Platform creates an audit log event about entity attributes removal with action type 'ATTRIBUTES_DELETED' that includes an error stacktrace."),
"Platform creates an audit log event about entity attributes removal with action type 'ATTRIBUTES_DELETED' that includes an error stacktrace."),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.DELETE)
@ -616,18 +610,24 @@ public class TelemetryController extends BaseController {
});
}
private DeferredResult<ResponseEntity> saveAttributes(TenantId srcTenantId, EntityId entityIdSrc, AttributeScope scope, JsonNode json) throws ThingsboardException {
private DeferredResult<ResponseEntity> saveAttributes(TenantId srcTenantId, EntityId entityIdSrc, AttributeScope scope, String jsonStr) throws ThingsboardException {
if (AttributeScope.SERVER_SCOPE != scope && AttributeScope.SHARED_SCOPE != scope) {
return getImmediateDeferredResult("Invalid scope: " + scope, HttpStatus.BAD_REQUEST);
}
if (json.isObject()) {
List<AttributeKvEntry> attributes = extractRequestAttributes(json);
JsonElement json;
try {
json = JsonParser.parseString(jsonStr);
} catch (Exception e) {
return getImmediateDeferredResult("Invalid JSON", HttpStatus.BAD_REQUEST);
}
if (json.isJsonObject()) {
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
if (attributes.isEmpty()) {
return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST);
}
for (AttributeKvEntry attributeKvEntry : attributes) {
if (attributeKvEntry.getKey().isEmpty() || attributeKvEntry.getKey().trim().length() == 0) {
return getImmediateDeferredResult("Key cannot be empty or contains only spaces", HttpStatus.BAD_REQUEST);
if (attributeKvEntry.getKey().isBlank()) {
return getImmediateDeferredResult("Key cannot be blank", HttpStatus.BAD_REQUEST);
}
}
SecurityUser user = getCurrentUser();
@ -885,43 +885,6 @@ public class TelemetryController extends BaseController {
return result;
}
private List<AttributeKvEntry> extractRequestAttributes(JsonNode jsonNode) {
long ts = System.currentTimeMillis();
List<AttributeKvEntry> attributes = new ArrayList<>();
jsonNode.fields().forEachRemaining(entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
if (entry.getValue().isObject() || entry.getValue().isArray()) {
attributes.add(new BaseAttributeKvEntry(new JsonDataEntry(key, toJsonStr(value)), ts));
} else if (entry.getValue().isTextual()) {
if (maxStringValueLength > 0 && entry.getValue().textValue().length() > maxStringValueLength) {
String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", entry.getValue().textValue().length(), key, maxStringValueLength);
throw new UncheckedApiException(new InvalidParametersException(message));
}
attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
} else if (entry.getValue().isBoolean()) {
attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
} else if (entry.getValue().isDouble()) {
attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
} else if (entry.getValue().isNumber()) {
if (entry.getValue().isBigInteger()) {
throw new UncheckedApiException(new InvalidParametersException("Big integer values are not supported!"));
} else {
attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
}
}
});
return attributes;
}
private String toJsonStr(JsonNode value) {
try {
return JacksonUtil.toString(value);
} catch (IllegalArgumentException e) {
throw new JsonParseException("Can't parse jsonValue: " + value, e);
}
}
private JsonNode toJsonNode(String value) {
try {
return JacksonUtil.toJsonNode(value);

View File

@ -266,7 +266,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json, ts);
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
Futures.addCallback(future, new FutureCallback<>() {
@Override
@ -314,7 +314,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json, ts);
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
Futures.addCallback(future, new FutureCallback<>() {
@Override

View File

@ -67,7 +67,6 @@ import org.thingsboard.server.service.security.permission.Resource;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.CsvUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@ -235,7 +234,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
@SneakyThrows
private void saveAttributes(SecurityUser user, E entity, Map.Entry<BulkImportColumnType, JsonObject> kvsEntry, BulkImportColumnType kvType) {
String scope = kvType.getKey();
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue()));
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(kvsEntry.getValue());
accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> {
tsSubscriptionService.saveAttributes(AttributesSaveRequest.builder()

View File

@ -56,11 +56,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
@ -538,13 +536,13 @@ public class JsonConverter {
return result;
}
public static Set<AttributeKvEntry> convertToAttributes(JsonElement element) {
public static List<AttributeKvEntry> convertToAttributes(JsonElement element) {
long ts = System.currentTimeMillis();
return convertToAttributes(element, ts);
}
public static Set<AttributeKvEntry> convertToAttributes(JsonElement element, long ts) {
return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList());
public static List<AttributeKvEntry> convertToAttributes(JsonElement element, long ts) {
return parseValues(element.getAsJsonObject()).stream().<AttributeKvEntry>map(kv -> new BaseAttributeKvEntry(kv, ts)).toList();
}
private static List<KvEntry> parseValues(JsonObject valuesObject) {

View File

@ -23,8 +23,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import java.util.ArrayList;
@Isolated("JsonConverter static settings being modified")
public class JsonConverterTest {
@ -53,7 +51,7 @@ public class JsonConverterTest {
@Test
public void testParseAttributesBigDecimalAsLong() {
var result = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString("{\"meterReadingDelta\": 1E1}")));
var result = JsonConverter.convertToAttributes(JsonParser.parseString("{\"meterReadingDelta\": 1E1}"));
Assertions.assertEquals(10L, result.get(0).getLongValue().get().longValue());
}
@ -108,4 +106,5 @@ public class JsonConverterTest {
JsonConverter.convertToTelemetry(JsonParser.parseString("{\"meterReadingDelta\": 9.9701010061400066E19}"), 0L);
});
}
}

View File

@ -44,7 +44,6 @@ import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.msg.TbMsgType.ACTIVITY_EVENT;
@ -115,14 +114,13 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
.build());
}
} else {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()));
List<AttributeKvEntry> filteredAttributes =
attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList());
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData())).stream()
.filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).toList();
ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(entityView.getId())
.scope(scope)
.entries(filteredAttributes)
.entries(attributes)
.callback(getFutureCallback(ctx, msg, entityView))
.build());
}

View File

@ -258,7 +258,7 @@ class DeviceState {
private boolean processAttributes(TbContext ctx, TbMsg msg, String scope) throws ExecutionException, InterruptedException {
boolean stateChanged = false;
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()));
List<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()));
if (!attributes.isEmpty()) {
SnapshotUpdate update = merge(latestValues, attributes, scope);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
@ -321,7 +321,7 @@ class DeviceState {
return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys);
}
private SnapshotUpdate merge(DataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
private SnapshotUpdate merge(DataSnapshot latestValues, List<AttributeKvEntry> attributes, String scope) {
long newTs = 0;
Set<AlarmConditionFilterKey> keys = new HashSet<>();
for (AttributeKvEntry entry : attributes) {

View File

@ -103,8 +103,7 @@ public class TbCalculatedFieldsNode implements TbNode {
}
private void processPostAttributesRequest(TbContext ctx, TbMsg msg) {
List<AttributeKvEntry> newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData())));
List<AttributeKvEntry> newAttributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()));
if (newAttributes.isEmpty()) {
ctx.tellSuccess(msg);
return;

View File

@ -41,7 +41,6 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -133,7 +132,7 @@ public class TbMsgAttributesNode implements TbNode {
return;
}
String src = msg.getData();
List<AttributeKvEntry> newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src)));
List<AttributeKvEntry> newAttributes = JsonConverter.convertToAttributes(JsonParser.parseString(src));
if (newAttributes.isEmpty()) {
ctx.tellSuccess(msg);
return;