Merge remote-tracking branch 'origin/edge-attribute-update-improvement' into edge-attribute-update-improvement
This commit is contained in:
commit
83f2d27222
@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
@ -127,11 +126,12 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
|||||||
CustomerId customerId = pair.getValue();
|
CustomerId customerId = pair.getValue();
|
||||||
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
|
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
|
||||||
if (entityData.hasPostAttributesMsg()) {
|
if (entityData.hasPostAttributesMsg()) {
|
||||||
|
metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope());
|
||||||
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
|
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
|
||||||
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts));
|
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts));
|
||||||
}
|
}
|
||||||
if (entityData.hasAttributesUpdatedMsg()) {
|
if (entityData.hasAttributesUpdatedMsg()) {
|
||||||
metaData.putValue("scope", entityData.getPostAttributeScope());
|
metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope());
|
||||||
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
|
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
|
||||||
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts));
|
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts));
|
||||||
}
|
}
|
||||||
@ -265,8 +265,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
|||||||
TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
|
TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
|
||||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||||
|
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
|
||||||
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
||||||
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes);
|
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(List<AttributeKvEntry> attributesToSave) {
|
public void onSuccess(List<AttributeKvEntry> attributesToSave) {
|
||||||
@ -312,7 +313,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
|||||||
long ts) {
|
long ts) {
|
||||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||||
AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope"));
|
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
|
||||||
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
||||||
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
|
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
|
|||||||
@ -500,6 +500,15 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
|
|||||||
attributesNode.put("test_attr", originalValue);
|
attributesNode.put("test_attr", originalValue);
|
||||||
doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode);
|
doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode);
|
||||||
|
|
||||||
|
// Wait before device attributes saved to database
|
||||||
|
Awaitility.await()
|
||||||
|
.atMost(10, TimeUnit.SECONDS)
|
||||||
|
.until(() -> {
|
||||||
|
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE;
|
||||||
|
List<String> actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {});
|
||||||
|
return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("test_attr");
|
||||||
|
});
|
||||||
|
|
||||||
JsonObject attributesData = new JsonObject();
|
JsonObject attributesData = new JsonObject();
|
||||||
// incorrect msg, will not be saved, because of ts is lower than for already existing
|
// incorrect msg, will not be saved, because of ts is lower than for already existing
|
||||||
String attributesKey = "test_attr";
|
String attributesKey = "test_attr";
|
||||||
|
|||||||
@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
|||||||
|
|
||||||
protected T config;
|
protected T config;
|
||||||
|
|
||||||
private static final String SCOPE = "scope";
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
this.config = TbNodeUtils.convert(configuration, getConfigClazz());
|
this.config = TbNodeUtils.convert(configuration, getConfigClazz());
|
||||||
@ -90,7 +88,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
|||||||
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES -> {
|
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES -> {
|
||||||
entityBody.put("kv", dataJson);
|
entityBody.put("kv", dataJson);
|
||||||
entityBody.put("ts", msg.getMetaDataTs());
|
entityBody.put("ts", msg.getMetaDataTs());
|
||||||
entityBody.put(SCOPE, getScope(metadata));
|
entityBody.put(DataConstants.SCOPE, getScope(metadata));
|
||||||
if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) {
|
if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) {
|
||||||
entityBody.put("isPostAttributes", true);
|
entityBody.put("isPostAttributes", true);
|
||||||
}
|
}
|
||||||
@ -99,7 +97,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
|||||||
List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
|
List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
|
||||||
});
|
});
|
||||||
entityBody.put("keys", keys);
|
entityBody.put("keys", keys);
|
||||||
entityBody.put(SCOPE, getScope(metadata));
|
entityBody.put(DataConstants.SCOPE, getScope(metadata));
|
||||||
}
|
}
|
||||||
case TIMESERIES_UPDATED -> {
|
case TIMESERIES_UPDATED -> {
|
||||||
entityBody.put("data", dataJson);
|
entityBody.put("data", dataJson);
|
||||||
@ -146,7 +144,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected String getScope(Map<String, String> metadata) {
|
protected String getScope(Map<String, String> metadata) {
|
||||||
String scope = metadata.get(SCOPE);
|
String scope = metadata.get(DataConstants.SCOPE);
|
||||||
if (StringUtils.isEmpty(scope)) {
|
if (StringUtils.isEmpty(scope)) {
|
||||||
scope = config.getScope();
|
scope = config.getScope();
|
||||||
}
|
}
|
||||||
@ -164,7 +162,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
|||||||
} else if (msg.isTypeOf(ATTRIBUTES_DELETED)) {
|
} else if (msg.isTypeOf(ATTRIBUTES_DELETED)) {
|
||||||
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
|
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
|
||||||
} else if (msg.isTypeOneOf(CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT)) {
|
} else if (msg.isTypeOneOf(CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT)) {
|
||||||
String scope = msg.getMetaData().getValue(SCOPE);
|
String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
|
||||||
actionType = StringUtils.isEmpty(scope) ?
|
actionType = StringUtils.isEmpty(scope) ?
|
||||||
EdgeEventActionType.TIMESERIES_UPDATED : EdgeEventActionType.ATTRIBUTES_UPDATED;
|
EdgeEventActionType.TIMESERIES_UPDATED : EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user