fixed bug + some refractoring

This commit is contained in:
Bohdan Smetaniuk 2020-06-24 12:39:58 +03:00
parent 3d8db0dbc8
commit cd70b6fb54
2 changed files with 40 additions and 27 deletions

View File

@ -20,7 +20,6 @@ import lombok.Data;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.id.EdgeEventId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.UUID;

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.edge;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -77,33 +78,35 @@ public class TbMsgPushToEdgeNode implements TbNode {
if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
if (isSupportedMsgType(msg.getType())) {
ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
Futures.transform(getEdgeIdFuture, edgeId -> {
EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
if (edgeEventTypeByEntityType == null) {
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
Futures.addCallback(getEdgeIdFuture, new FutureCallback<EdgeId>() {
@Override
public void onSuccess(@org.checkerframework.checker.nullness.qual.Nullable EdgeId edgeId) {
EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
if (edgeEventTypeByEntityType == null) {
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
}
EdgeEvent edgeEvent = null;
try {
edgeEvent = buildEdgeEvent(ctx, msg, edgeId, edgeEventTypeByEntityType);
} catch (JsonProcessingException e) {
log.error("Failed to build edge event", e);
}
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent event) {
ctx.tellNext(msg, SUCCESS);
} @Override
public void onFailure(Throwable th) {
log.error("Could not save edge event", th);
ctx.tellFailure(msg, th);
}
}, ctx.getDbCallbackExecutor());
} @Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
EdgeEvent edgeEvent = new EdgeEvent();
edgeEvent.setTenantId(ctx.getTenantId());
edgeEvent.setEdgeId(edgeId);
edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name());
edgeEvent.setEntityId(msg.getOriginator().getId());
edgeEvent.setEdgeEventType(edgeEventTypeByEntityType);
edgeEvent.setEntityBody(json.valueToTree(msg.getData()));
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent event) {
ctx.tellNext(msg, SUCCESS);
}
@Override
public void onFailure(Throwable th) {
log.error("Could not save edge event", th);
ctx.tellFailure(msg, th);
}
}, ctx.getDbCallbackExecutor());
return null;
}, ctx.getDbCallbackExecutor());
} else {
log.debug("Unsupported msg type {}", msg.getType());
@ -115,6 +118,17 @@ public class TbMsgPushToEdgeNode implements TbNode {
}
}
private EdgeEvent buildEdgeEvent(TbContext ctx, TbMsg msg, EdgeId edgeId, EdgeEventType edgeEventTypeByEntityType) throws JsonProcessingException {
EdgeEvent edgeEvent = new EdgeEvent();
edgeEvent.setTenantId(ctx.getTenantId());
edgeEvent.setEdgeId(edgeId);
edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name());
edgeEvent.setEntityId(msg.getOriginator().getId());
edgeEvent.setEdgeEventType(edgeEventTypeByEntityType);
edgeEvent.setEntityBody(json.readTree(msg.getData()));
return edgeEvent;
}
private ActionType getActionTypeByMsgType(String msgType) {
ActionType actionType;
if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {