From 64e60de9bc8ce8376ed99c702e97314d2e9248c7 Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Tue, 29 Sep 2020 17:10:09 +0300 Subject: [PATCH] processing future + attributes node refactoring --- .../rpc/processor/TelemetryProcessor.java | 27 ++++++++++++++----- .../engine/telemetry/TbMsgAttributesNode.java | 4 +-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index fe3fd208f6..cc2cb15d4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; @@ -140,12 +142,23 @@ public class TelemetryProcessor extends BaseProcessor { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); - attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(@Nullable List voids) { + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process post attributes [{}]", msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -153,7 +166,7 @@ public class TelemetryProcessor extends BaseProcessor { log.error("Can't process post attributes [{}]", msg, t); futureToSet.setException(t); } - }); + }, dbCallbackExecutorService); return futureToSet; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index f0f5d2418f..5fd06108e4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -63,9 +63,7 @@ public class TbMsgAttributesNode implements TbNode { } String src = msg.getData(); Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); - if (StringUtils.isEmpty(msg.getMetaData().getValue(SCOPE))) { - msg.getMetaData().putValue(SCOPE, config.getScope()); - } + msg.getMetaData().putValue(SCOPE, config.getScope()); ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); }