BaseTelemetryProcessor - use async instead of .get()
This commit is contained in:
parent
1223347587
commit
5f793b11f9
@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
@ -81,6 +82,7 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -262,13 +264,16 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
return new ImmutablePair<>(queueName, ruleChainId);
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
|
||||
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId,
|
||||
TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||
|
||||
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
||||
filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes, json);
|
||||
|
||||
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes);
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(List<AttributeKvEntry> attributesToSave) {
|
||||
JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave);
|
||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
TbMsg tbMsg = TbMsg.newMsg()
|
||||
.queueName(defaultQueueAndRuleChain.getKey())
|
||||
@ -276,7 +281,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
.originator(entityId)
|
||||
.customerId(customerId)
|
||||
.copyMetaData(metaData)
|
||||
.data(gson.toJson(json))
|
||||
.data(gson.toJson(jsonToSave))
|
||||
.ruleChainId(defaultQueueAndRuleChain.getValue())
|
||||
.build();
|
||||
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@ -291,6 +296,14 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t);
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
@ -299,12 +312,16 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
EntityId entityId,
|
||||
TransportProtos.PostAttributeMsg msg,
|
||||
TbMsgMetaData metaData,
|
||||
long ts) throws Exception {
|
||||
long ts) {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||
AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope"));
|
||||
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts));
|
||||
List<AttributeKvEntry> attributesToSave = filterAttributesByTs(tenantId, entityId, scope, attributes, json);
|
||||
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes);
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(List<AttributeKvEntry> attributesToSave) {
|
||||
JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave);
|
||||
tsSubService.saveAttributes(AttributesSaveRequest.builder()
|
||||
.tenantId(tenantId)
|
||||
.entityId(entityId)
|
||||
@ -320,7 +337,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
.originator(entityId)
|
||||
.customerId(customerId)
|
||||
.copyMetaData(metaData)
|
||||
.data(gson.toJson(json))
|
||||
.data(gson.toJson(jsonToSave))
|
||||
.ruleChainId(defaultQueueAndRuleChain.getValue())
|
||||
.build();
|
||||
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@ -344,12 +361,26 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
})
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t);
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg,
|
||||
String entityType) {
|
||||
private JsonObject filterAttributesFromJson(JsonObject json, List<AttributeKvEntry> attributesToSave) {
|
||||
Set<String> keysToSave = attributesToSave.stream()
|
||||
.map(KvEntry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
json.keySet().removeIf(key -> !keysToSave.contains(key));
|
||||
return json;
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
|
||||
String scope = attributeDeleteMsg.getScope();
|
||||
List<String> attributeKeys = attributeDeleteMsg.getAttributeNamesList();
|
||||
ListenableFuture<List<String>> removeAllFuture = edgeCtx.getAttributesService().removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys);
|
||||
@ -401,21 +432,19 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson));
|
||||
}
|
||||
|
||||
private List<AttributeKvEntry> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes, JsonObject jsonObject) throws Exception {
|
||||
private ListenableFuture<List<AttributeKvEntry>> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope,
|
||||
List<AttributeKvEntry> attributes) {
|
||||
List<String> keys = attributes.stream().map(KvEntry::getKey).toList();
|
||||
Map<String, Long> existingAttributesTs = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys).get()
|
||||
.stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs));
|
||||
ListenableFuture<List<AttributeKvEntry>> future = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys);
|
||||
return Futures.transform(future, input -> {
|
||||
Map<String, Long> existingAttributesTs = input.stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs));
|
||||
return attributes.stream()
|
||||
.filter(attribute -> {
|
||||
String key = attribute.getKey();
|
||||
long incomingTs = attribute.getLastUpdateTs();
|
||||
if (incomingTs > existingAttributesTs.getOrDefault(key, 0L)) {
|
||||
return true;
|
||||
} else {
|
||||
jsonObject.remove(key);
|
||||
return false;
|
||||
}
|
||||
return incomingTs > existingAttributesTs.getOrDefault(key, 0L);
|
||||
}).toList();
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user