processing future + attributes node refactoring

This commit is contained in:
Bohdan Smetaniuk 2020-09-29 17:10:09 +03:00
parent 8059274002
commit 64e60de9bc
2 changed files with 21 additions and 10 deletions

View File

@ -15,12 +15,14 @@
*/ */
package org.thingsboard.server.service.edge.rpc.processor; package org.thingsboard.server.service.edge.rpc.processor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
@ -140,12 +142,23 @@ public class TelemetryProcessor extends BaseProcessor {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json); Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); ListenableFuture<List<Void>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); Futures.addCallback(future, new FutureCallback<List<Void>>() {
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override @Override
public void onSuccess(TbQueueMsgMetadata metadata) { public void onSuccess(@Nullable List<Void> voids) {
futureToSet.set(null); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json));
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process post attributes [{}]", msg, t);
futureToSet.setException(t);
}
});
} }
@Override @Override
@ -153,7 +166,7 @@ public class TelemetryProcessor extends BaseProcessor {
log.error("Can't process post attributes [{}]", msg, t); log.error("Can't process post attributes [{}]", msg, t);
futureToSet.setException(t); futureToSet.setException(t);
} }
}); }, dbCallbackExecutorService);
return futureToSet; return futureToSet;
} }

View File

@ -63,9 +63,7 @@ public class TbMsgAttributesNode implements TbNode {
} }
String src = msg.getData(); String src = msg.getData();
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
if (StringUtils.isEmpty(msg.getMetaData().getValue(SCOPE))) { msg.getMetaData().putValue(SCOPE, config.getScope());
msg.getMetaData().putValue(SCOPE, config.getScope());
}
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(),
new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
} }