From 1d5f5c5d8037c5cc7dfa6ee70cb15ca554d0c327 Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Tue, 25 Aug 2020 14:04:47 +0300 Subject: [PATCH] code fixes --- .../edge/DefaultEdgeNotificationService.java | 8 ++-- .../service/edge/rpc/EdgeGrpcSession.java | 48 +++++++++++++++---- .../server/dao/edge/EdgeService.java | 3 +- .../server/dao/edge/EdgeServiceImpl.java | 15 +++--- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 2 +- 5 files changed, 53 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 27489c98cc..fbc5bcf376 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { } } } else { - ListenableFuture> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, dbCallbackExecutorService); + ListenableFuture> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId); Futures.transform(edgeIdsFuture, edgeIds -> { if (edgeIds != null && !edgeIds.isEmpty()) { for (EdgeId edgeId : edgeIds) { @@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { if (alarm != null) { EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); if (edgeEventType != null) { - ListenableFuture> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), dbCallbackExecutorService); + ListenableFuture> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { if (relatedEdgeIdsByEntityId != null) { for (EdgeId edgeId : relatedEdgeIdsByEntityId) { @@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && !relation.getTo().getEntityType().equals(EntityType.EDGE)) { List>> futures = new ArrayList<>(); - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo(), dbCallbackExecutorService)); - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom(), dbCallbackExecutorService)); + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo())); + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom())); ListenableFuture>> combinedFuture = Futures.allAsList(futures); Futures.transform(combinedFuture, listOfListsEdgeIds -> { Set uniqueEdgeIds = new HashSet<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 86c45c78ba..99ffe79d9e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -937,25 +938,46 @@ public final class EdgeGrpcSession implements Closeable { } private ListenableFuture processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); metaData.putValue("ts", tsKv.getTs() + ""); TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json)); - // TODO: voba - verify that null callback is OK - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + futureToSet.setException(t); + } + }); } - return Futures.immediateFuture(null); + return futureToSet; } private ListenableFuture processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); - // TODO: voba - verify that null callback is OK - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); - return Futures.immediateFuture(null); + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + futureToSet.setException(t); + } + }); + return futureToSet; } private ListenableFuture processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { + SettableFuture futureToSet = SettableFuture.create(); try { String scope = attributeDeleteMsg.getScope(); List attributeNames = attributeDeleteMsg.getAttributeNamesList(); @@ -966,13 +988,23 @@ public final class EdgeGrpcSession implements Closeable { attributeKeys.add(new AttributeKey(scope, attributeName)); } ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - edge.getTenantId(), (DeviceId) entityId, attributeKeys), null); + edge.getTenantId(), (DeviceId) entityId, attributeKeys), new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + futureToSet.setException(t); + } + }); } } catch (Exception e) { log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, e); return Futures.immediateFailedFuture(new RuntimeException("Can't process attribute delete msg " + attributeDeleteMsg, e)); } - return Futures.immediateFuture(null); + return futureToSet; } private ListenableFuture onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 71c56068d1..13702a13ef 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.page.TextPageLink; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; public interface EdgeService { @@ -76,7 +75,7 @@ public interface EdgeService { ListenableFuture> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId); - ListenableFuture> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService); + ListenableFuture> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 4ce86beea5..6d3aa3e901 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -67,7 +67,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; @@ -429,7 +428,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic }; @Override - public ListenableFuture> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService) { + public ListenableFuture> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { switch (entityId.getEntityType()) { case DEVICE: case ASSET: @@ -442,11 +441,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic } else { return Collections.emptyList(); } - }, executorService); + }, MoreExecutors.directExecutor()); case DASHBOARD: - return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())), executorService); + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); case RULE_CHAIN: - return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())), executorService); + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); case USER: User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); TextPageData edges; @@ -455,20 +454,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic } else { edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); } - return convertToEdgeIds(Futures.immediateFuture(edges.getData()), executorService); + return convertToEdgeIds(Futures.immediateFuture(edges.getData())); default: return Futures.immediateFuture(Collections.emptyList()); } } - private ListenableFuture> convertToEdgeIds(ListenableFuture> future, Executor executorService) { + private ListenableFuture> convertToEdgeIds(ListenableFuture> future) { return Futures.transform(future, edges -> { if (edges != null && !edges.isEmpty()) { return edges.stream().map(IdBased::getId).collect(Collectors.toList()); } else { return Collections.emptyList(); } - }, executorService); + }, MoreExecutors.directExecutor()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index b531af7ed3..1c0e44050d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -232,7 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode { TextPageData edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); } else { - return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId, ctx.getDbCallbackExecutor()); + return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId); } }