diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 72014cf1c9..7220075e77 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -298,6 +298,12 @@ public final class EdgeGrpcSession implements Closeable { case DASHBOARD: entityId = new DashboardId(edgeEvent.getEntityId()); break; + case TENANT: + entityId = new TenantId(edgeEvent.getEntityId()); + break; + case CUSTOMER: + entityId = new CustomerId(edgeEvent.getEntityId()); + break; } if (entityId != null) { log.debug("Sending telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody()); @@ -748,6 +754,10 @@ public final class EdgeGrpcSession implements Closeable { return new EntityViewId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); case DASHBOARD: return new DashboardId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); + case TENANT: + return new TenantId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); + case CUSTOMER: + return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); default: log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData); return null; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java index 784763f99e..eff437a4a5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java @@ -39,6 +39,10 @@ public final class EdgeUtils { return EdgeEventType.USER; case ALARM: return EdgeEventType.ALARM; + case TENANT: + return EdgeEventType.TENANT; + case CUSTOMER: + return EdgeEventType.CUSTOMER; default: return null; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index 52259d4507..cc77b629bd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -26,5 +26,6 @@ public enum EdgeEventType { EDGE, USER, CUSTOMER, - RELATION + RELATION, + TENANT } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index f1134078c1..013ade7480 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -33,11 +33,15 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.IdBased; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.TextPageData; +import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; @@ -46,10 +50,12 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -84,34 +90,38 @@ public class TbMsgPushToEdgeNode implements TbNode { } if (isSupportedOriginator(msg.getOriginator().getEntityType())) { if (isSupportedMsgType(msg.getType())) { - ListenableFuture getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator()); - Futures.addCallback(getEdgeIdFuture, new FutureCallback() { + ListenableFuture> getEdgeIdsFuture = getEdgeIdsByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator()); + Futures.addCallback(getEdgeIdsFuture, new FutureCallback>() { @Override - public void onSuccess(@Nullable EdgeId edgeId) { - try { - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); - if (edgeEvent == null) { - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); - } else { - edgeEvent.setEdgeId(edgeId); - ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent event) { - ctx.tellNext(msg, SUCCESS); - } + public void onSuccess(@Nullable List edgeIds) { + if (edgeIds != null && !edgeIds.isEmpty()) { + for (EdgeId edgeId : edgeIds) { + try { + EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); + if (edgeEvent == null) { + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); + } else { + edgeEvent.setEdgeId(edgeId); + ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable EdgeEvent event) { + ctx.tellNext(msg, SUCCESS); + } - @Override - public void onFailure(Throwable th) { - log.error("Could not save edge event", th); - ctx.tellFailure(msg, th); + @Override + public void onFailure(Throwable th) { + log.error("Could not save edge event", th); + ctx.tellFailure(msg, th); + } + }, ctx.getDbCallbackExecutor()); } - }, ctx.getDbCallbackExecutor()); + } catch (JsonProcessingException e) { + log.error("Failed to build edge event", e); + ctx.tellFailure(msg, e); + } } - } catch (JsonProcessingException e) { - log.error("Failed to build edge event", e); - ctx.tellFailure(msg, e); } } @@ -201,6 +211,8 @@ public class TbMsgPushToEdgeNode implements TbNode { case ASSET: case ENTITY_VIEW: case DASHBOARD: + case TENANT: + case CUSTOMER: return true; default: return false; @@ -215,15 +227,20 @@ public class TbMsgPushToEdgeNode implements TbNode { || DataConstants.ALARM.equals(msgType); } - private ListenableFuture getEdgeIdByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) { - ListenableFuture> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); - return Futures.transform(future, relations -> { - if (relations != null && relations.size() > 0) { - return new EdgeId(relations.get(0).getFrom().getId()); - } else { - return null; - } - }, ctx.getDbCallbackExecutor()); + private ListenableFuture> getEdgeIdsByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) { + if (EntityType.TENANT.equals(originatorId.getEntityType())) { + TextPageData edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); + return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); + } else { + ListenableFuture> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + return Futures.transform(future, relations -> { + List result = new ArrayList<>(); + if (relations != null && relations.size() > 0) { + result.add(new EdgeId(relations.get(0).getFrom().getId())); + } + return result; + }, ctx.getDbCallbackExecutor()); + } } @Override