Added support for Tenant Entity
This commit is contained in:
parent
635086986b
commit
869a492c2c
@ -298,6 +298,12 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
case DASHBOARD:
|
||||
entityId = new DashboardId(edgeEvent.getEntityId());
|
||||
break;
|
||||
case TENANT:
|
||||
entityId = new TenantId(edgeEvent.getEntityId());
|
||||
break;
|
||||
case CUSTOMER:
|
||||
entityId = new CustomerId(edgeEvent.getEntityId());
|
||||
break;
|
||||
}
|
||||
if (entityId != null) {
|
||||
log.debug("Sending telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody());
|
||||
@ -748,6 +754,10 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
return new EntityViewId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case DASHBOARD:
|
||||
return new DashboardId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case TENANT:
|
||||
return new TenantId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case CUSTOMER:
|
||||
return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
default:
|
||||
log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData);
|
||||
return null;
|
||||
|
||||
@ -39,6 +39,10 @@ public final class EdgeUtils {
|
||||
return EdgeEventType.USER;
|
||||
case ALARM:
|
||||
return EdgeEventType.ALARM;
|
||||
case TENANT:
|
||||
return EdgeEventType.TENANT;
|
||||
case CUSTOMER:
|
||||
return EdgeEventType.CUSTOMER;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -26,5 +26,6 @@ public enum EdgeEventType {
|
||||
EDGE,
|
||||
USER,
|
||||
CUSTOMER,
|
||||
RELATION
|
||||
RELATION,
|
||||
TENANT
|
||||
}
|
||||
|
||||
@ -33,11 +33,15 @@ import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.IdBased;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.TextPageData;
|
||||
import org.thingsboard.server.common.data.page.TextPageLink;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
@ -46,10 +50,12 @@ import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
|
||||
@ -84,34 +90,38 @@ public class TbMsgPushToEdgeNode implements TbNode {
|
||||
}
|
||||
if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
|
||||
if (isSupportedMsgType(msg.getType())) {
|
||||
ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
|
||||
Futures.addCallback(getEdgeIdFuture, new FutureCallback<EdgeId>() {
|
||||
ListenableFuture<List<EdgeId>> getEdgeIdsFuture = getEdgeIdsByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
|
||||
Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable EdgeId edgeId) {
|
||||
try {
|
||||
EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
||||
if (edgeEvent == null) {
|
||||
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
|
||||
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
|
||||
} else {
|
||||
edgeEvent.setEdgeId(edgeId);
|
||||
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
||||
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable EdgeEvent event) {
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
}
|
||||
public void onSuccess(@Nullable List<EdgeId> edgeIds) {
|
||||
if (edgeIds != null && !edgeIds.isEmpty()) {
|
||||
for (EdgeId edgeId : edgeIds) {
|
||||
try {
|
||||
EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
||||
if (edgeEvent == null) {
|
||||
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
|
||||
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
|
||||
} else {
|
||||
edgeEvent.setEdgeId(edgeId);
|
||||
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
||||
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable EdgeEvent event) {
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable th) {
|
||||
log.error("Could not save edge event", th);
|
||||
ctx.tellFailure(msg, th);
|
||||
@Override
|
||||
public void onFailure(Throwable th) {
|
||||
log.error("Could not save edge event", th);
|
||||
ctx.tellFailure(msg, th);
|
||||
}
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to build edge event", e);
|
||||
ctx.tellFailure(msg, e);
|
||||
}
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to build edge event", e);
|
||||
ctx.tellFailure(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -201,6 +211,8 @@ public class TbMsgPushToEdgeNode implements TbNode {
|
||||
case ASSET:
|
||||
case ENTITY_VIEW:
|
||||
case DASHBOARD:
|
||||
case TENANT:
|
||||
case CUSTOMER:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
@ -215,15 +227,20 @@ public class TbMsgPushToEdgeNode implements TbNode {
|
||||
|| DataConstants.ALARM.equals(msgType);
|
||||
}
|
||||
|
||||
private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {
|
||||
ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
|
||||
return Futures.transform(future, relations -> {
|
||||
if (relations != null && relations.size() > 0) {
|
||||
return new EdgeId(relations.get(0).getFrom().getId());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
private ListenableFuture<List<EdgeId>> getEdgeIdsByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {
|
||||
if (EntityType.TENANT.equals(originatorId.getEntityType())) {
|
||||
TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
|
||||
return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
|
||||
} else {
|
||||
ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
|
||||
return Futures.transform(future, relations -> {
|
||||
List<EdgeId> result = new ArrayList<>();
|
||||
if (relations != null && relations.size() > 0) {
|
||||
result.add(new EdgeId(relations.get(0).getFrom().getId()));
|
||||
}
|
||||
return result;
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user