EntityDataProto: add ts to correctly handle PostAttributeMsg update from/to Edge
This commit is contained in:
		
							parent
							
								
									34bd41ff4e
								
							
						
					
					
						commit
						4eedea0766
					
				@ -67,6 +67,9 @@ public class EntityDataMsgConstructor {
 | 
			
		||||
                    TransportProtos.PostAttributeMsg attributesUpdatedMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"));
 | 
			
		||||
                    builder.setAttributesUpdatedMsg(attributesUpdatedMsg);
 | 
			
		||||
                    builder.setPostAttributeScope(getScopeOfDefault(data));
 | 
			
		||||
                    if (data.get("ts") != null && !data.get("ts").isJsonNull()) {
 | 
			
		||||
                        builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong());
 | 
			
		||||
                    }
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e);
 | 
			
		||||
                }
 | 
			
		||||
@ -77,6 +80,9 @@ public class EntityDataMsgConstructor {
 | 
			
		||||
                    TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"));
 | 
			
		||||
                    builder.setPostAttributesMsg(postAttributesMsg);
 | 
			
		||||
                    builder.setPostAttributeScope(getScopeOfDefault(data));
 | 
			
		||||
                    if (data.get("ts") != null && !data.get("ts").isJsonNull()) {
 | 
			
		||||
                        builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong());
 | 
			
		||||
                    }
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
@ -54,6 +54,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
@ -79,7 +80,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
@ -114,7 +117,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
 | 
			
		||||
    abstract protected String getMsgSourceKey();
 | 
			
		||||
 | 
			
		||||
    public List<ListenableFuture<Void>> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) {
 | 
			
		||||
    public List<ListenableFuture<Void>> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) throws Exception {
 | 
			
		||||
        log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData);
 | 
			
		||||
        List<ListenableFuture<Void>> result = new ArrayList<>();
 | 
			
		||||
        EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB());
 | 
			
		||||
@ -125,11 +128,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
                CustomerId customerId = pair.getValue();
 | 
			
		||||
                metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
 | 
			
		||||
                if (entityData.hasPostAttributesMsg()) {
 | 
			
		||||
                    result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
 | 
			
		||||
                    long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
 | 
			
		||||
                    result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts));
 | 
			
		||||
                }
 | 
			
		||||
                if (entityData.hasAttributesUpdatedMsg()) {
 | 
			
		||||
                    metaData.putValue("scope", entityData.getPostAttributeScope());
 | 
			
		||||
                    result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
 | 
			
		||||
                    long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
 | 
			
		||||
                    result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts));
 | 
			
		||||
                }
 | 
			
		||||
                if (entityData.hasPostTelemetryMsg()) {
 | 
			
		||||
                    result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
 | 
			
		||||
@ -257,9 +262,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
        return new ImmutablePair<>(queueName, ruleChainId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
 | 
			
		||||
    private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
 | 
			
		||||
        SettableFuture<Void> futureToSet = SettableFuture.create();
 | 
			
		||||
        JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
 | 
			
		||||
 | 
			
		||||
        List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
 | 
			
		||||
        filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes, json);
 | 
			
		||||
 | 
			
		||||
        var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
 | 
			
		||||
        TbMsg tbMsg = TbMsg.newMsg()
 | 
			
		||||
                .queueName(defaultQueueAndRuleChain.getKey())
 | 
			
		||||
@ -289,16 +298,18 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
                                                           CustomerId customerId,
 | 
			
		||||
                                                           EntityId entityId,
 | 
			
		||||
                                                           TransportProtos.PostAttributeMsg msg,
 | 
			
		||||
                                                           TbMsgMetaData metaData) {
 | 
			
		||||
                                                           TbMsgMetaData metaData,
 | 
			
		||||
                                                           long ts) throws Exception {
 | 
			
		||||
        SettableFuture<Void> futureToSet = SettableFuture.create();
 | 
			
		||||
        JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
 | 
			
		||||
        List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json));
 | 
			
		||||
        String scope = metaData.getValue("scope");
 | 
			
		||||
        AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope"));
 | 
			
		||||
        List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
 | 
			
		||||
        List<AttributeKvEntry> attributesToSave = filterAttributesByTs(tenantId, entityId, scope, attributes, json);
 | 
			
		||||
        tsSubService.saveAttributes(AttributesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .entityId(entityId)
 | 
			
		||||
                .scope(AttributeScope.valueOf(scope))
 | 
			
		||||
                .entries(attributes)
 | 
			
		||||
                .scope(scope)
 | 
			
		||||
                .entries(attributesToSave)
 | 
			
		||||
                .callback(new FutureCallback<>() {
 | 
			
		||||
                    @Override
 | 
			
		||||
                    public void onSuccess(@Nullable Void tmp) {
 | 
			
		||||
@ -390,4 +401,21 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
 | 
			
		||||
                entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<AttributeKvEntry> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes, JsonObject jsonObject) throws Exception {
 | 
			
		||||
        List<String> keys = attributes.stream().map(KvEntry::getKey).toList();
 | 
			
		||||
        Map<String, Long> existingAttributesTs = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys).get()
 | 
			
		||||
                .stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs));
 | 
			
		||||
        return attributes.stream()
 | 
			
		||||
                .filter(attribute -> {
 | 
			
		||||
                    String key = attribute.getKey();
 | 
			
		||||
                    long incomingTs = attribute.getLastUpdateTs();
 | 
			
		||||
                    if (incomingTs > existingAttributesTs.getOrDefault(key, 0L)) {
 | 
			
		||||
                        return true;
 | 
			
		||||
                    } else {
 | 
			
		||||
                        jsonObject.remove(key);
 | 
			
		||||
                        return false;
 | 
			
		||||
                    }
 | 
			
		||||
                }).toList();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -488,6 +488,59 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSendOutdatedAttributeToCloud() throws Exception {
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge();
 | 
			
		||||
 | 
			
		||||
        edgeImitator.expectResponsesAmount(1);
 | 
			
		||||
 | 
			
		||||
        ObjectNode attributesNode = JacksonUtil.newObjectNode();
 | 
			
		||||
        String originalValue = "original_value";
 | 
			
		||||
        attributesNode.put("test_attr", originalValue);
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode);
 | 
			
		||||
 | 
			
		||||
        JsonObject attributesData = new JsonObject();
 | 
			
		||||
        // incorrect msg, will not be saved, because of ts is lower than for already existing
 | 
			
		||||
        String attributesKey = "test_attr";
 | 
			
		||||
        String attributeValueIncorrect = "test_value";
 | 
			
		||||
        // correct msg, will be saved, no ts issue
 | 
			
		||||
        String attributeKey2 = "test_attr2";
 | 
			
		||||
        String attributeValue2Correct = "test_value2";
 | 
			
		||||
        attributesData.addProperty(attributesKey, attributeValueIncorrect);
 | 
			
		||||
        attributesData.addProperty(attributeKey2, attributeValue2Correct);
 | 
			
		||||
        UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
 | 
			
		||||
        EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
 | 
			
		||||
        entityDataBuilder.setEntityType(device.getId().getEntityType().name());
 | 
			
		||||
        entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits());
 | 
			
		||||
        entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits());
 | 
			
		||||
        entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData));
 | 
			
		||||
        entityDataBuilder.setPostAttributeScope(DataConstants.SERVER_SCOPE);
 | 
			
		||||
        entityDataBuilder.setAttributeTs(ts);
 | 
			
		||||
 | 
			
		||||
        uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
 | 
			
		||||
 | 
			
		||||
        edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForResponses());
 | 
			
		||||
 | 
			
		||||
        String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE;
 | 
			
		||||
        List<Map<String, String>> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        Optional<Map<String, String>> customAttributeOpt = getAttributeByKey(attributesKey, attributes);
 | 
			
		||||
        Assert.assertTrue(customAttributeOpt.isPresent());
 | 
			
		||||
        Map<String, String> customAttribute = customAttributeOpt.get();
 | 
			
		||||
        Assert.assertNotEquals(attributeValueIncorrect, customAttribute.get("value"));
 | 
			
		||||
        Assert.assertEquals(originalValue, customAttribute.get("value"));
 | 
			
		||||
 | 
			
		||||
        customAttributeOpt = getAttributeByKey(attributeKey2, attributes);
 | 
			
		||||
        Assert.assertTrue(customAttributeOpt.isPresent());
 | 
			
		||||
        customAttribute = customAttributeOpt.get();
 | 
			
		||||
        Assert.assertEquals(attributeValue2Correct, customAttribute.get("value"));
 | 
			
		||||
 | 
			
		||||
        doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSendDeviceToCloudWithNameThatAlreadyExistsOnCloud() throws Exception {
 | 
			
		||||
        String deviceOnCloudName = StringUtils.randomAlphanumeric(15);
 | 
			
		||||
 | 
			
		||||
@ -132,6 +132,7 @@ message EntityDataProto {
 | 
			
		||||
  transport.PostAttributeMsg attributesUpdatedMsg = 6;
 | 
			
		||||
  string postAttributeScope = 7;
 | 
			
		||||
  AttributeDeleteMsg attributeDeleteMsg = 8;
 | 
			
		||||
  optional int64 attributeTs = 9;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message AttributeDeleteMsg {
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,6 @@ import java.util.Map.Entry;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.TreeMap;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
public class JsonConverter {
 | 
			
		||||
 | 
			
		||||
@ -540,10 +539,12 @@ public class JsonConverter {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Set<AttributeKvEntry> convertToAttributes(JsonElement element) {
 | 
			
		||||
        Set<AttributeKvEntry> result = new HashSet<>();
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
 | 
			
		||||
        return result;
 | 
			
		||||
        return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Set<AttributeKvEntry> convertToAttributes(JsonElement element, long ts) {
 | 
			
		||||
        return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static List<KvEntry> parseValues(JsonObject valuesObject) {
 | 
			
		||||
@ -702,4 +703,5 @@ public class JsonConverter {
 | 
			
		||||
            return "";
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -49,6 +49,7 @@ public class JsonUtils {
 | 
			
		||||
        }
 | 
			
		||||
        return json;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static JsonElement parse(Object value) {
 | 
			
		||||
        if (value instanceof Integer) {
 | 
			
		||||
            return new JsonPrimitive((Integer) value);
 | 
			
		||||
@ -67,7 +68,7 @@ public class JsonUtils {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static JsonObject convertToJsonObject(Map<String,?> map) {
 | 
			
		||||
    public static JsonObject convertToJsonObject(Map<String, ?> map) {
 | 
			
		||||
        JsonObject jsonObject = new JsonObject();
 | 
			
		||||
        for (Map.Entry<String, ?> entry : map.entrySet()) {
 | 
			
		||||
            jsonObject.add(entry.getKey(), parse(entry.getValue()));
 | 
			
		||||
@ -75,4 +76,5 @@ public class JsonUtils {
 | 
			
		||||
 | 
			
		||||
        return jsonObject;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -89,6 +89,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
 | 
			
		||||
            switch (actionType) {
 | 
			
		||||
                case ATTRIBUTES_UPDATED, POST_ATTRIBUTES -> {
 | 
			
		||||
                    entityBody.put("kv", dataJson);
 | 
			
		||||
                    entityBody.put("ts", msg.getMetaDataTs());
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) {
 | 
			
		||||
                        entityBody.put("isPostAttributes", true);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user