BaseTelemetryProcessor - use scope from msg, and not hardcoded CLIENT_SCOPE

This commit is contained in:
Volodymyr Babak 2025-01-31 16:58:12 +02:00
parent 5f793b11f9
commit ce4c654e21
2 changed files with 9 additions and 9 deletions

View File

@ -130,11 +130,12 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
CustomerId customerId = pair.getValue(); CustomerId customerId = pair.getValue();
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
if (entityData.hasPostAttributesMsg()) { if (entityData.hasPostAttributesMsg()) {
metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope());
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts));
} }
if (entityData.hasAttributesUpdatedMsg()) { if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope()); metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope());
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis();
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts));
} }
@ -268,8 +269,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes); ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
public void onSuccess(List<AttributeKvEntry> attributesToSave) { public void onSuccess(List<AttributeKvEntry> attributesToSave) {
@ -315,7 +317,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
long ts) { long ts) {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE));
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes); ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {

View File

@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
protected T config; protected T config;
private static final String SCOPE = "scope";
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, getConfigClazz()); this.config = TbNodeUtils.convert(configuration, getConfigClazz());
@ -90,7 +88,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES -> { case ATTRIBUTES_UPDATED, POST_ATTRIBUTES -> {
entityBody.put("kv", dataJson); entityBody.put("kv", dataJson);
entityBody.put("ts", msg.getMetaDataTs()); entityBody.put("ts", msg.getMetaDataTs());
entityBody.put(SCOPE, getScope(metadata)); entityBody.put(DataConstants.SCOPE, getScope(metadata));
if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) {
entityBody.put("isPostAttributes", true); entityBody.put("isPostAttributes", true);
} }
@ -99,7 +97,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
}); });
entityBody.put("keys", keys); entityBody.put("keys", keys);
entityBody.put(SCOPE, getScope(metadata)); entityBody.put(DataConstants.SCOPE, getScope(metadata));
} }
case TIMESERIES_UPDATED -> { case TIMESERIES_UPDATED -> {
entityBody.put("data", dataJson); entityBody.put("data", dataJson);
@ -146,7 +144,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
} }
protected String getScope(Map<String, String> metadata) { protected String getScope(Map<String, String> metadata) {
String scope = metadata.get(SCOPE); String scope = metadata.get(DataConstants.SCOPE);
if (StringUtils.isEmpty(scope)) { if (StringUtils.isEmpty(scope)) {
scope = config.getScope(); scope = config.getScope();
} }
@ -164,7 +162,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
} else if (msg.isTypeOf(ATTRIBUTES_DELETED)) { } else if (msg.isTypeOf(ATTRIBUTES_DELETED)) {
actionType = EdgeEventActionType.ATTRIBUTES_DELETED; actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
} else if (msg.isTypeOneOf(CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT)) { } else if (msg.isTypeOneOf(CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT)) {
String scope = msg.getMetaData().getValue(SCOPE); String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
actionType = StringUtils.isEmpty(scope) ? actionType = StringUtils.isEmpty(scope) ?
EdgeEventActionType.TIMESERIES_UPDATED : EdgeEventActionType.ATTRIBUTES_UPDATED; EdgeEventActionType.TIMESERIES_UPDATED : EdgeEventActionType.ATTRIBUTES_UPDATED;
} else { } else {