Merge pull request #51 from BohdanSmetanyuk/fix/attributes_saving

fixed attributes saving
This commit is contained in:
VoBa 2020-10-02 12:32:47 +03:00 committed by GitHub
commit 5d86bf6f9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 34 deletions

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.service.edge.rpc.constructor;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -27,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.gen.edge.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.List;
@ -57,7 +57,12 @@ public class EntityDataMsgConstructor {
case ATTRIBUTES_UPDATED:
try {
JsonObject data = entityData.getAsJsonObject();
builder.setPostAttributesMsg(JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")));
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"));
if (data.has("isPostAttributes") && data.getAsJsonPrimitive("isPostAttributes").getAsBoolean()) {
builder.setPostAttributesMsg(postAttributeMsg);
} else {
builder.setAttributesUpdatedMsg(postAttributeMsg);
}
builder.setPostAttributeScope(data.getAsJsonPrimitive("scope").getAsString());
} catch (Exception e) {
log.warn("Can't convert to attributes proto, entityData [{}]", entityData, e);

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.service.edge.rpc.processor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
@ -36,9 +38,11 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.edge.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.EntityDataProto;
@ -47,6 +51,7 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -63,13 +68,16 @@ public class TelemetryProcessor extends BaseProcessor {
public List<ListenableFuture<Void>> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) {
List<ListenableFuture<Void>> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData);
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg()) && entityId != null) {
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
if (entityData.hasPostAttributesMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData));
}
if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
}
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData));
}
@ -152,6 +160,38 @@ public class TelemetryProcessor extends BaseProcessor {
return futureToSet;
}
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
ListenableFuture<List<Void>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
Futures.addCallback(future, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(@Nullable List<Void> voids) {
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json));
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t);
futureToSet.setException(t);
}
});
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
return futureToSet;
}
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributeDeleteMsg.getScope();

View File

@ -102,8 +102,9 @@ message EntityDataProto {
string entityType = 3;
transport.PostTelemetryMsg postTelemetryMsg = 4;
transport.PostAttributeMsg postAttributesMsg = 5;
string postAttributeScope = 6;
AttributeDeleteMsg attributeDeleteMsg = 7;
transport.PostAttributeMsg attributesUpdatedMsg = 6;
string postAttributeScope = 7;
AttributeDeleteMsg attributeDeleteMsg = 8;
// transport.ToDeviceRpcRequestMsg ???
}

View File

@ -138,16 +138,37 @@ public class TbMsgPushToEdgeNode implements TbNode {
}
private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException {
if (DataConstants.ALARM.equals(msg.getType())) {
String msgType = msg.getType();
if (DataConstants.ALARM.equals(msgType)) {
return buildEdgeEvent(ctx.getTenantId(), ActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null);
} else {
EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
if (edgeEventTypeByEntityType == null) {
return null;
}
ActionType actionType = getActionTypeByMsgType(msg.getType());
JsonNode entityBody = getEntityBody(actionType, msg.getData(), msg.getMetaData().getData());
return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, entityBody);
ActionType actionType = getActionTypeByMsgType(msgType);
Map<String, Object> entityBody = new HashMap<>();
Map<String, String> metadata = msg.getMetaData().getData();
JsonNode dataJson = json.readTree(msg.getData());
switch (actionType) {
case ATTRIBUTES_UPDATED:
entityBody.put("kv", dataJson);
entityBody.put("scope", metadata.get("scope"));
if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
entityBody.put("isPostAttributes", true);
}
break;
case ATTRIBUTES_DELETED:
List<String> keys = json.treeToValue(dataJson.get("attributes"), List.class);
entityBody.put("keys", keys);
entityBody.put("scope", metadata.get("scope"));
break;
case TIMESERIES_UPDATED:
entityBody.put("data", dataJson);
entityBody.put("ts", metadata.get("ts"));
break;
}
return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, json.valueToTree(entityBody));
}
}
@ -161,27 +182,6 @@ public class TbMsgPushToEdgeNode implements TbNode {
return edgeEvent;
}
private JsonNode getEntityBody(ActionType actionType, String data, Map<String, String> metadata) throws JsonProcessingException {
Map<String, Object> entityBody = new HashMap<>();
JsonNode dataJson = json.readTree(data);
switch (actionType) {
case ATTRIBUTES_UPDATED:
entityBody.put("kv", dataJson);
entityBody.put("scope", metadata.get("scope"));
break;
case ATTRIBUTES_DELETED:
List<String> keys = json.treeToValue(dataJson.get("attributes"), List.class);
entityBody.put("keys", keys);
entityBody.put("scope", metadata.get("scope"));
break;
case TIMESERIES_UPDATED:
entityBody.put("data", dataJson);
entityBody.put("ts", metadata.get("ts"));
break;
}
return json.valueToTree(entityBody);
}
private UUID getUUIDFromMsgData(TbMsg msg) throws JsonProcessingException {
JsonNode data = json.readTree(msg.getData()).get("id");
String id = json.treeToValue(data.get("id"), String.class);

View File

@ -63,9 +63,7 @@ public class TbMsgAttributesNode implements TbNode {
}
String src = msg.getData();
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
if (StringUtils.isEmpty(msg.getMetaData().getValue(SCOPE))) {
msg.getMetaData().putValue(SCOPE, config.getScope());
}
msg.getMetaData().putValue(SCOPE, config.getScope());
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(),
new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
}