From 3e42f00cf789797737e21d9504d8fc14fd91e631 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 18 Aug 2020 17:52:11 +0300 Subject: [PATCH] Ack cloud messages --- .../service/edge/rpc/EdgeGrpcSession.java | 674 +++++++++--------- .../edge/rpc/init/DefaultSyncEdgeService.java | 142 ++-- .../edge/rpc/init/SyncEdgeService.java | 11 +- .../thingsboard/edge/rpc/EdgeGrpcClient.java | 38 +- .../thingsboard/edge/rpc/EdgeRpcClient.java | 7 +- common/edge-api/src/main/proto/edge.proto | 43 +- 6 files changed, 474 insertions(+), 441 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 519f4529f1..26849faeec 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -81,6 +82,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.util.JsonUtils; +import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.AssetUpdateMsg; import org.thingsboard.server.gen.edge.AttributesRequestMsg; @@ -93,11 +95,12 @@ import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.DownlinkResponseMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EntityDataProto; -import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.RelationRequestMsg; +import org.thingsboard.server.gen.edge.RelationUpdateMsg; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; @@ -120,11 +123,14 @@ import org.thingsboard.server.service.edge.EdgeContextComponent; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -137,6 +143,8 @@ public final class EdgeGrpcSession implements Closeable { private static final ReentrantLock deviceCreationLock = new ReentrantLock(); + private static final ReentrantLock responseMsgLock = new ReentrantLock(); + private final Gson gson = new Gson(); private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; @@ -152,6 +160,8 @@ public final class EdgeGrpcSession implements Closeable { private StreamObserver outputStream; private boolean connected; + private CountDownLatch latch; + private TbQueueProducer> ruleEngineMsgProducer; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, @@ -172,7 +182,7 @@ public final class EdgeGrpcSession implements Closeable { public void onNext(RequestMsg requestMsg) { if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) { ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg()); - outputStream.onNext(ResponseMsg.newBuilder() + sendResponseMsg(ResponseMsg.newBuilder() .setConnectResponseMsg(responseMsg) .build()); if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) { @@ -184,9 +194,10 @@ public final class EdgeGrpcSession implements Closeable { } if (connected) { if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) { - outputStream.onNext(ResponseMsg.newBuilder() - .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg())) - .build()); + onUplinkMsg(requestMsg.getUplinkMsg()); + } + if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasDownlinkResponseMsg()) { + onDownlinkResponse(requestMsg.getDownlinkResponseMsg()); } } } @@ -204,11 +215,54 @@ public final class EdgeGrpcSession implements Closeable { }; } + private void onUplinkMsg(UplinkMsg uplinkMsg) { + ListenableFuture> future = processUplinkMsg(uplinkMsg); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List result) { + UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build(); + sendResponseMsg(ResponseMsg.newBuilder() + .setUplinkResponseMsg(uplinkResponseMsg) + .build()); + } + + @Override + public void onFailure(Throwable t) { + UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build(); + sendResponseMsg(ResponseMsg.newBuilder() + .setUplinkResponseMsg(uplinkResponseMsg) + .build()); + } + }, MoreExecutors.directExecutor()); + } + + private void onDownlinkResponse(DownlinkResponseMsg msg) { + try { + if (msg.getSuccess()) { + log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg); + } else { + log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg()); + } + latch.countDown(); + } catch (Exception e) { + log.error("Can't process downlink response message [{}]", msg, e); + } + } + + private void sendResponseMsg(ResponseMsg responseMsg) { + try { + responseMsgLock.lock(); + outputStream.onNext(responseMsg); + } finally { + responseMsgLock.unlock(); + } + } + void onConfigurationUpdate(Edge edge) { try { this.edge = edge; // TODO: voba - push edge configuration update to edge -// outputStream.onNext(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder() +// sendResponseMsg(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder() // .setIntegrationUpdateMsg(IntegrationUpdateMsg.newBuilder() // .setConfiguration(constructIntegrationConfigProto(configuration, defaultConverterProto, downLinkConverterProto)) // .build()) @@ -223,48 +277,39 @@ public final class EdgeGrpcSession implements Closeable { TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); TimePageData pageData; UUID ifOffset = null; + boolean success = true; do { pageData = ctx.getEdgeNotificationService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink); if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); - for (EdgeEvent edgeEvent : pageData.getData()) { - log.trace("[{}] Processing edge event [{}]", this.sessionId, edgeEvent); - try { - ActionType edgeEventAction = ActionType.valueOf(edgeEvent.getEdgeEventAction()); - switch (edgeEventAction) { - case UPDATED: - case ADDED: - case ASSIGNED_TO_EDGE: - case DELETED: - case UNASSIGNED_FROM_EDGE: - case ALARM_ACK: - case ALARM_CLEAR: - case CREDENTIALS_UPDATED: - case RELATION_ADD_OR_UPDATE: - case RELATION_DELETED: - processEntityMessage(edgeEvent, edgeEventAction); - break; - case ATTRIBUTES_UPDATED: - case ATTRIBUTES_DELETED: - case TIMESERIES_UPDATED: - processTelemetryMessage(edgeEvent); - break; - } - } catch (Exception e) { - log.error("Exception during processing records from queue", e); - } - ifOffset = edgeEvent.getUuidId(); + List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); + log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size()); + + latch = new CountDownLatch(downlinkMsgsPack.size()); + for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { + sendResponseMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); + } + + ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + + success = latch.await(10, TimeUnit.SECONDS); + if (!success) { + log.warn("Failed to deliver the batch: {}", downlinkMsgsPack); } } - if (isConnected() && pageData.hasNext()) { - pageLink = pageData.getNextPageLink(); + if (isConnected() && (!success || pageData.hasNext())) { try { Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); } catch (InterruptedException e) { log.error("Error during sleep between batches", e); } + if (success) { + pageLink = pageData.getNextPageLink(); + } } - } while (isConnected() && pageData.hasNext()); + } while (isConnected() && (!success || pageData.hasNext())); if (ifOffset != null) { Long newStartTs = UUIDs.unixTimestamp(ifOffset); @@ -277,6 +322,42 @@ public final class EdgeGrpcSession implements Closeable { } } + private List convertToDownlinkMsgsPack(List edgeEvents) { + List result = new ArrayList<>(); + for (EdgeEvent edgeEvent : edgeEvents) { + log.trace("Processing edge event [{}]", edgeEvent); + try { + DownlinkMsg downlinkMsg = null; + ActionType edgeEventAction = ActionType.valueOf(edgeEvent.getEdgeEventAction()); + switch (edgeEventAction) { + case UPDATED: + case ADDED: + case ASSIGNED_TO_EDGE: + case DELETED: + case UNASSIGNED_FROM_EDGE: + case ALARM_ACK: + case ALARM_CLEAR: + case CREDENTIALS_UPDATED: + case RELATION_ADD_OR_UPDATE: + case RELATION_DELETED: + downlinkMsg = processEntityMessage(edgeEvent, edgeEventAction); + break; + case ATTRIBUTES_UPDATED: + case ATTRIBUTES_DELETED: + case TIMESERIES_UPDATED: + downlinkMsg = processTelemetryMessage(edgeEvent); + break; + } + if (downlinkMsg != null) { + result.add(downlinkMsg); + } + } catch (Exception e) { + log.error("Exception during processing records from queue", e); + } + } + return result; + } + private ListenableFuture getQueueStartTs() { ListenableFuture> future = ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY); @@ -296,7 +377,7 @@ public final class EdgeGrpcSession implements Closeable { ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes); } - private void processTelemetryMessage(EdgeEvent edgeEvent) throws IOException { + private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) throws IOException { log.trace("Executing processTelemetryMessage, edgeEvent [{}]", edgeEvent); EntityId entityId = null; switch (edgeEvent.getEdgeEventType()) { @@ -319,74 +400,61 @@ public final class EdgeGrpcSession implements Closeable { entityId = new CustomerId(edgeEvent.getEntityId()); break; } + DownlinkMsg downlinkMsg = null; if (entityId != null) { log.debug("Sending telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody()); - DownlinkMsg downlinkMsg; try { ActionType actionType = ActionType.valueOf(edgeEvent.getEdgeEventAction()); downlinkMsg = constructEntityDataProtoMsg(entityId, actionType, JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getEntityBody()))); - outputStream.onNext(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); } catch (Exception e) { log.warn("Can't send telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody(), e); } - } + return downlinkMsg; } - private void processEntityMessage(EdgeEvent edgeEvent, ActionType edgeEventAction) { + private DownlinkMsg processEntityMessage(EdgeEvent edgeEvent, ActionType edgeEventAction) { UpdateMsgType msgType = getResponseMsgType(ActionType.valueOf(edgeEvent.getEdgeEventAction())); log.trace("Executing processEntityMessage, edgeEvent [{}], edgeEventAction [{}], msgType [{}]", edgeEvent, edgeEventAction, msgType); switch (edgeEvent.getEdgeEventType()) { case EDGE: // TODO: voba - add edge update logic - break; + return null; case DEVICE: - processDevice(edgeEvent, msgType, edgeEventAction); - break; + return processDevice(edgeEvent, msgType, edgeEventAction); case ASSET: - processAsset(edgeEvent, msgType, edgeEventAction); - break; + return processAsset(edgeEvent, msgType, edgeEventAction); case ENTITY_VIEW: - processEntityView(edgeEvent, msgType, edgeEventAction); - break; + return processEntityView(edgeEvent, msgType, edgeEventAction); case DASHBOARD: - processDashboard(edgeEvent, msgType, edgeEventAction); - break; + return processDashboard(edgeEvent, msgType, edgeEventAction); case CUSTOMER: - processCustomer(edgeEvent, msgType, edgeEventAction); - break; + return processCustomer(edgeEvent, msgType, edgeEventAction); case RULE_CHAIN: - processRuleChain(edgeEvent, msgType, edgeEventAction); - break; + return processRuleChain(edgeEvent, msgType, edgeEventAction); case RULE_CHAIN_METADATA: - processRuleChainMetadata(edgeEvent, msgType); - break; + return processRuleChainMetadata(edgeEvent, msgType); case ALARM: - processAlarm(edgeEvent, msgType); - break; + return processAlarm(edgeEvent, msgType); case USER: - processUser(edgeEvent, msgType, edgeEventAction); - break; + return processUser(edgeEvent, msgType, edgeEventAction); case RELATION: - processRelation(edgeEvent, msgType); - break; + return processRelation(edgeEvent, msgType); case WIDGETS_BUNDLE: - processWidgetsBundle(edgeEvent, msgType, edgeEventAction); - break; + return processWidgetsBundle(edgeEvent, msgType, edgeEventAction); case WIDGET_TYPE: - processWidgetType(edgeEvent, msgType, edgeEventAction); - break; + return processWidgetType(edgeEvent, msgType, edgeEventAction); case ADMIN_SETTINGS: - processAdminSettings(edgeEvent); - break; + return processAdminSettings(edgeEvent); + default: + log.warn("Unsupported edge event type [{}]", edgeEvent); + return null; } } - private void processDevice(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { + private DownlinkMsg processDevice(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeActionType) { case ADDED: case UPDATED: @@ -395,8 +463,8 @@ public final class EdgeGrpcSession implements Closeable { if (device != null) { DeviceUpdateMsg deviceUpdateMsg = ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDeviceUpdateMsg(deviceUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) .build(); } break; @@ -404,8 +472,8 @@ public final class EdgeGrpcSession implements Closeable { case UNASSIGNED_FROM_EDGE: DeviceUpdateMsg deviceUpdateMsg = ctx.getDeviceUpdateMsgConstructor().constructDeviceDeleteMsg(deviceId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDeviceUpdateMsg(deviceUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) .build(); break; case CREDENTIALS_UPDATED: @@ -413,22 +481,18 @@ public final class EdgeGrpcSession implements Closeable { if (deviceCredentials != null) { DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = ctx.getDeviceUpdateMsgConstructor().constructDeviceCredentialsUpdatedMsg(deviceCredentials); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg)) .build(); } break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processAsset(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + private DownlinkMsg processAsset(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { AssetId assetId = new AssetId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeEventAction) { case ADDED: case UPDATED: @@ -437,8 +501,8 @@ public final class EdgeGrpcSession implements Closeable { if (asset != null) { AssetUpdateMsg assetUpdateMsg = ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setAssetUpdateMsg(assetUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) .build(); } break; @@ -446,21 +510,17 @@ public final class EdgeGrpcSession implements Closeable { case UNASSIGNED_FROM_EDGE: AssetUpdateMsg assetUpdateMsg = ctx.getAssetUpdateMsgConstructor().constructAssetDeleteMsg(assetId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setAssetUpdateMsg(assetUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processEntityView(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + private DownlinkMsg processEntityView(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeEventAction) { case ADDED: case UPDATED: @@ -469,8 +529,8 @@ public final class EdgeGrpcSession implements Closeable { if (entityView != null) { EntityViewUpdateMsg entityViewUpdateMsg = ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setEntityViewUpdateMsg(entityViewUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) .build(); } break; @@ -478,21 +538,17 @@ public final class EdgeGrpcSession implements Closeable { case UNASSIGNED_FROM_EDGE: EntityViewUpdateMsg entityViewUpdateMsg = ctx.getEntityViewUpdateMsgConstructor().constructEntityViewDeleteMsg(entityViewId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setEntityViewUpdateMsg(entityViewUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processDashboard(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + private DownlinkMsg processDashboard(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeEventAction) { case ADDED: case UPDATED: @@ -501,8 +557,8 @@ public final class EdgeGrpcSession implements Closeable { if (dashboard != null) { DashboardUpdateMsg dashboardUpdateMsg = ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDashboardUpdateMsg(dashboardUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) .build(); } break; @@ -510,21 +566,17 @@ public final class EdgeGrpcSession implements Closeable { case UNASSIGNED_FROM_EDGE: DashboardUpdateMsg dashboardUpdateMsg = ctx.getDashboardUpdateMsgConstructor().constructDashboardDeleteMsg(dashboardId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDashboardUpdateMsg(dashboardUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + private DownlinkMsg processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { CustomerId customerId = new CustomerId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeEventAction) { case ADDED: case UPDATED: @@ -532,29 +584,25 @@ public final class EdgeGrpcSession implements Closeable { if (customer != null) { CustomerUpdateMsg customerUpdateMsg = ctx.getCustomerUpdateMsgConstructor().constructCustomerUpdatedMsg(msgType, customer); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setCustomerUpdateMsg(customerUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) .build(); } break; case DELETED: CustomerUpdateMsg customerUpdateMsg = ctx.getCustomerUpdateMsgConstructor().constructCustomerDeleteMsg(customerId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setCustomerUpdateMsg(customerUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + private DownlinkMsg processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeEventAction) { case ADDED: case UPDATED: @@ -563,46 +611,41 @@ public final class EdgeGrpcSession implements Closeable { if (ruleChain != null) { RuleChainUpdateMsg ruleChainUpdateMsg = ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRuleChainUpdateMsg(ruleChainUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg)) .build(); } break; case DELETED: case UNASSIGNED_FROM_EDGE: - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainDeleteMsg(ruleChainId)) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainUpdateMsg(Collections.singletonList(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainDeleteMsg(ruleChainId))) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processRuleChainMetadata(EdgeEvent edgeEvent, UpdateMsgType msgType) { + private DownlinkMsg processRuleChainMetadata(EdgeEvent edgeEvent, UpdateMsgType msgType) { RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); RuleChain ruleChain = ctx.getRuleChainService().findRuleChainById(edgeEvent.getTenantId(), ruleChainId); + DownlinkMsg downlinkMsg = null; if (ruleChain != null) { RuleChainMetaData ruleChainMetaData = ctx.getRuleChainService().loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId); RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ctx.getRuleChainUpdateMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); if (ruleChainMetadataUpdateMsg != null) { - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg)) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); } } + return downlinkMsg; } - private void processUser(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { + private DownlinkMsg processUser(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { UserId userId = new UserId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeActionType) { case ADDED: case UPDATED: @@ -612,15 +655,15 @@ public final class EdgeGrpcSession implements Closeable { boolean fullAccess = Authority.TENANT_ADMIN.equals(user.getAuthority()); setFullAccess(user, fullAccess); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user)) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserUpdateMsg(Collections.singletonList(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user))) .build(); } break; case DELETED: case UNASSIGNED_FROM_EDGE: - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserDeleteMsg(userId)) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserUpdateMsg(Collections.singletonList(ctx.getUserUpdateMsgConstructor().constructUserDeleteMsg(userId))) .build(); break; case CREDENTIALS_UPDATED: @@ -628,16 +671,12 @@ public final class EdgeGrpcSession implements Closeable { if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) { UserCredentialsUpdateMsg userCredentialsUpdateMsg = ctx.getUserUpdateMsgConstructor().constructUserCredentialsUpdatedMsg(userCredentialsByUserId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setUserCredentialsUpdateMsg(userCredentialsUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg)) .build(); } } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } private void setFullAccess(User user, boolean isFullAccess) { @@ -649,36 +688,33 @@ public final class EdgeGrpcSession implements Closeable { user.setAdditionalInfo(additionalInfo); } - private void processRelation(EdgeEvent edgeEvent, UpdateMsgType msgType) { + private DownlinkMsg processRelation(EdgeEvent edgeEvent, UpdateMsgType msgType) { EntityRelation entityRelation = mapper.convertValue(edgeEvent.getEntityBody(), EntityRelation.class); - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRelationUpdateMsg(ctx.getRelationUpdateMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation)) + RelationUpdateMsg r = ctx.getRelationUpdateMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation); + return DownlinkMsg.newBuilder() + .addAllRelationUpdateMsg(Collections.singletonList(r)) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); } - private void processAlarm(EdgeEvent edgeEvent, UpdateMsgType msgType) { + private DownlinkMsg processAlarm(EdgeEvent edgeEvent, UpdateMsgType msgType) { + DownlinkMsg downlinkMsg = null; try { AlarmId alarmId = new AlarmId(edgeEvent.getEntityId()); Alarm alarm = ctx.getAlarmService().findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get(); if (alarm != null) { - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setAlarmUpdateMsg(ctx.getAlarmUpdateMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm)) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAlarmUpdateMsg(Collections.singletonList(ctx.getAlarmUpdateMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); } } catch (Exception e) { log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e); } + return downlinkMsg; } - private void processWidgetsBundle(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { + private DownlinkMsg processWidgetsBundle(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { WidgetsBundleId widgetsBundleId = new WidgetsBundleId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeActionType) { case ADDED: case UPDATED: @@ -686,29 +722,25 @@ public final class EdgeGrpcSession implements Closeable { if (widgetsBundle != null) { WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = ctx.getWidgetsBundleUpdateMsgConstructor().constructWidgetsBundleUpdateMsg(msgType, widgetsBundle); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setWidgetsBundleUpdateMsg(widgetsBundleUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) .build(); } break; case DELETED: WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = ctx.getWidgetsBundleUpdateMsgConstructor().constructWidgetsBundleDeleteMsg(widgetsBundleId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setWidgetsBundleUpdateMsg(widgetsBundleUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processWidgetType(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { + private DownlinkMsg processWidgetType(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeActionType) { WidgetTypeId widgetTypeId = new WidgetTypeId(edgeEvent.getEntityId()); - EntityUpdateMsg entityUpdateMsg = null; + DownlinkMsg downlinkMsg = null; switch (edgeActionType) { case ADDED: case UPDATED: @@ -716,34 +748,28 @@ public final class EdgeGrpcSession implements Closeable { if (widgetType != null) { WidgetTypeUpdateMsg widgetTypeUpdateMsg = ctx.getWidgetTypeUpdateMsgConstructor().constructWidgetTypeUpdateMsg(msgType, widgetType); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setWidgetTypeUpdateMsg(widgetTypeUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) .build(); } break; case DELETED: WidgetTypeUpdateMsg widgetTypeUpdateMsg = ctx.getWidgetTypeUpdateMsgConstructor().constructWidgetTypeDeleteMsg(widgetTypeId); - entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setWidgetTypeUpdateMsg(widgetTypeUpdateMsg) + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) .build(); break; } - if (entityUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } + return downlinkMsg; } - private void processAdminSettings(EdgeEvent edgeEvent) { + private DownlinkMsg processAdminSettings(EdgeEvent edgeEvent) { AdminSettings adminSettings = mapper.convertValue(edgeEvent.getEntityBody(), AdminSettings.class); - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setAdminSettingsUpdateMsg(ctx.getAdminSettingsUpdateMsgConstructor().constructAdminSettingsUpdateMsg(adminSettings)) + AdminSettingsUpdateMsg t = ctx.getAdminSettingsUpdateMsgConstructor().constructAdminSettingsUpdateMsg(adminSettings); + return DownlinkMsg.newBuilder() + .addAllAdminSettingsUpdateMsg(Collections.singletonList(t)) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); } private UpdateMsgType getResponseMsgType(ActionType actionType) { @@ -775,112 +801,101 @@ public final class EdgeGrpcSession implements Closeable { return builder.build(); } - private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) { + private ListenableFuture> processUplinkMsg(UplinkMsg uplinkMsg) { + List> result = new ArrayList<>(); try { if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { EntityId entityId = constructEntityId(entityData); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg()) && entityId != null) { - ListenableFuture metaDataFuture = constructBaseMsgMetadata(entityId); - Futures.transform(metaDataFuture, metaData -> { - if (metaData != null) { - metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); - if (entityData.hasPostAttributesMsg()) { - processPostAttributes(entityId, entityData.getPostAttributesMsg(), metaData); - } - if (entityData.hasPostTelemetryMsg()) { - processPostTelemetry(entityId, entityData.getPostTelemetryMsg(), metaData); - } - } - return null; - }, ctx.getDbCallbackExecutor()); + TbMsgMetaData metaData = constructBaseMsgMetadata(entityId); + metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); + if (entityData.hasPostAttributesMsg()) { + metaData.putValue("scope", entityData.getPostAttributeScope()); + result.add(processPostAttributes(entityId, entityData.getPostAttributesMsg(), metaData)); + } + if (entityData.hasPostTelemetryMsg()) { + result.add(processPostTelemetry(entityId, entityData.getPostTelemetryMsg(), metaData)); + } } } } if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) { for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { - onDeviceUpdate(deviceUpdateMsg); + result.add(onDeviceUpdate(deviceUpdateMsg)); } } if (uplinkMsg.getDeviceCredentialsUpdateMsgList() != null && !uplinkMsg.getDeviceCredentialsUpdateMsgList().isEmpty()) { for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) { - onDeviceCredentialsUpdate(deviceCredentialsUpdateMsg); + result.add(onDeviceCredentialsUpdate(deviceCredentialsUpdateMsg)); } } if (uplinkMsg.getAlarmUpdateMsgList() != null && !uplinkMsg.getAlarmUpdateMsgList().isEmpty()) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { - onAlarmUpdate(alarmUpdateMsg); + result.add(onAlarmUpdate(alarmUpdateMsg)); } } if (uplinkMsg.getRuleChainMetadataRequestMsgList() != null && !uplinkMsg.getRuleChainMetadataRequestMsgList().isEmpty()) { for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { - ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge, ruleChainMetadataRequestMsg); + result.add(ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge, ruleChainMetadataRequestMsg)); } } if (uplinkMsg.getAttributesRequestMsgList() != null && !uplinkMsg.getAttributesRequestMsgList().isEmpty()) { for (AttributesRequestMsg attributesRequestMsg : uplinkMsg.getAttributesRequestMsgList()) { - ctx.getSyncEdgeService().processAttributesRequestMsg(edge, attributesRequestMsg); + result.add(ctx.getSyncEdgeService().processAttributesRequestMsg(edge, attributesRequestMsg)); } } if (uplinkMsg.getRelationRequestMsgList() != null && !uplinkMsg.getRelationRequestMsgList().isEmpty()) { for (RelationRequestMsg relationRequestMsg : uplinkMsg.getRelationRequestMsgList()) { - ctx.getSyncEdgeService().processRelationRequestMsg(edge, relationRequestMsg); + result.add(ctx.getSyncEdgeService().processRelationRequestMsg(edge, relationRequestMsg)); } } if (uplinkMsg.getUserCredentialsRequestMsgList() != null && !uplinkMsg.getUserCredentialsRequestMsgList().isEmpty()) { for (UserCredentialsRequestMsg userCredentialsRequestMsg : uplinkMsg.getUserCredentialsRequestMsgList()) { - ctx.getSyncEdgeService().processUserCredentialsRequestMsg(edge, userCredentialsRequestMsg); + result.add(ctx.getSyncEdgeService().processUserCredentialsRequestMsg(edge, userCredentialsRequestMsg)); } } if (uplinkMsg.getDeviceCredentialsRequestMsgList() != null && !uplinkMsg.getDeviceCredentialsRequestMsgList().isEmpty()) { for (DeviceCredentialsRequestMsg deviceCredentialsRequestMsg : uplinkMsg.getDeviceCredentialsRequestMsgList()) { - ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg); + result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg)); } } } catch (Exception e) { - return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build(); + log.error("Can't process uplink msg [{}]", uplinkMsg, e); } - - return UplinkResponseMsg.newBuilder().setSuccess(true).build(); + return Futures.allAsList(result); } - private ListenableFuture constructBaseMsgMetadata(EntityId entityId) { + private TbMsgMetaData constructBaseMsgMetadata(EntityId entityId) { + TbMsgMetaData metaData = new TbMsgMetaData(); switch (entityId.getEntityType()) { case DEVICE: - ListenableFuture deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edge.getTenantId(), new DeviceId(entityId.getId())); - return Futures.transform(deviceFuture, device -> { - TbMsgMetaData metaData = new TbMsgMetaData(); - if (device != null) { - metaData.putValue("deviceName", device.getName()); - metaData.putValue("deviceType", device.getType()); - } - return metaData; - }, ctx.getDbCallbackExecutor()); + Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(entityId.getId())); + if (device != null) { + metaData.putValue("deviceName", device.getName()); + metaData.putValue("deviceType", device.getType()); + } + break; case ASSET: - ListenableFuture assetFuture = ctx.getAssetService().findAssetByIdAsync(edge.getTenantId(), new AssetId(entityId.getId())); - return Futures.transform(assetFuture, asset -> { - TbMsgMetaData metaData = new TbMsgMetaData(); - if (asset != null) { - metaData.putValue("assetName", asset.getName()); - metaData.putValue("assetType", asset.getType()); - } - return metaData; - }, ctx.getDbCallbackExecutor()); + Asset asset = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(entityId.getId())); + if (asset != null) { + metaData.putValue("assetName", asset.getName()); + metaData.putValue("assetType", asset.getType()); + } + break; case ENTITY_VIEW: - ListenableFuture entityViewFuture = ctx.getEntityViewService().findEntityViewByIdAsync(edge.getTenantId(), new EntityViewId(entityId.getId())); - return Futures.transform(entityViewFuture, entityView -> { - TbMsgMetaData metaData = new TbMsgMetaData(); - if (entityView != null) { - metaData.putValue("entityViewName", entityView.getName()); - metaData.putValue("entityViewType", entityView.getType()); - } - return metaData; - }, ctx.getDbCallbackExecutor()); + EntityView entityView = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(entityId.getId())); + if (entityView != null) { + metaData.putValue("entityViewName", entityView.getName()); + metaData.putValue("entityViewType", entityView.getType()); + } + break; default: - log.debug("Constructing empty metadata for entityId [{}]", entityId); - return Futures.immediateFuture(new TbMsgMetaData()); + log.debug("Using empty metadata for entityId [{}]", entityId); + break; } + return metaData; } private EntityId constructEntityId(EntityDataProto entityData) { @@ -904,7 +919,7 @@ public final class EdgeGrpcSession implements Closeable { } } - private void processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); metaData.putValue("ts", tsKv.getTs() + ""); @@ -912,17 +927,18 @@ public final class EdgeGrpcSession implements Closeable { // TODO: voba - verify that null callback is OK ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); } + return Futures.immediateFuture(null); } - private void processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); // TODO: voba - verify that null callback is OK ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); + return Futures.immediateFuture(null); } - private void onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { - log.info("onDeviceUpdate {}", deviceUpdateMsg); + private ListenableFuture onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { DeviceId edgeDeviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -931,11 +947,12 @@ public final class EdgeGrpcSession implements Closeable { if (device != null) { // device with this name already exists on the cloud - update ID on the edge if (!device.getId().equals(edgeDeviceId)) { - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device)) + DeviceUpdateMsg d = ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device); + DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(d)) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) + sendResponseMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) .build()); } } else { @@ -943,11 +960,12 @@ public final class EdgeGrpcSession implements Closeable { if (deviceById != null) { // this ID already used by other device - create new device and update ID on the edge device = createDevice(deviceUpdateMsg); - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device)) + DeviceUpdateMsg d = ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device); + DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(d)) .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) + sendResponseMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) .build()); } else { device = createDevice(deviceUpdateMsg); @@ -966,8 +984,10 @@ public final class EdgeGrpcSession implements Closeable { } break; case UNRECOGNIZED: - log.error("Unsupported msg type"); + log.error("Unsupported msg type {}", deviceUpdateMsg.getMsgType()); + return Futures.immediateFailedFuture(new RuntimeException("Unsupported msg type " + deviceUpdateMsg.getMsgType())); } + return Futures.immediateFuture(null); } private void updateDevice(DeviceUpdateMsg deviceUpdateMsg) { @@ -981,33 +1001,26 @@ public final class EdgeGrpcSession implements Closeable { requestDeviceCredentialsFromEdge(device); } - private void onDeviceCredentialsUpdate(DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { + private ListenableFuture onDeviceCredentialsUpdate(DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); ListenableFuture deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edge.getTenantId(), deviceId); - - Futures.addCallback(deviceFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Device device) { - if (device != null) { - log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", - device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); - try { - DeviceCredentials deviceCredentials = ctx.getDeviceCredentialsService().findDeviceCredentialsByDeviceId(edge.getTenantId(), device.getId()); - deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); - deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); - deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.getCredentialsValue()); - ctx.getDeviceCredentialsService().updateDeviceCredentials(edge.getTenantId(), deviceCredentials); - } catch (Exception e) { - log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", device.getName(), deviceCredentialsUpdateMsg, e); - } + return Futures.transform(deviceFuture, device -> { + if (device != null) { + log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", + device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); + try { + DeviceCredentials deviceCredentials = ctx.getDeviceCredentialsService().findDeviceCredentialsByDeviceId(edge.getTenantId(), device.getId()); + deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); + deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); + deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.getCredentialsValue()); + ctx.getDeviceCredentialsService().updateDeviceCredentials(edge.getTenantId(), deviceCredentials); + } catch (Exception e) { + log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", device.getName(), deviceCredentialsUpdateMsg, e); + throw new RuntimeException(e); } } - - @Override - public void onFailure(Throwable t) { - log.error("Can't update device credentials for deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg, t); - } + return null; }, ctx.getDbCallbackExecutor()); } @@ -1015,7 +1028,7 @@ public final class EdgeGrpcSession implements Closeable { log.debug("Executing requestDeviceCredentialsFromEdge device [{}]", device); DownlinkMsg downlinkMsg = constructDeviceCredentialsRequestMsg(device.getId()); - outputStream.onNext(ResponseMsg.newBuilder() + sendResponseMsg(ResponseMsg.newBuilder() .setDownlinkMsg(downlinkMsg) .build()); } @@ -1118,49 +1131,52 @@ public final class EdgeGrpcSession implements Closeable { } } - private void onAlarmUpdate(AlarmUpdateMsg alarmUpdateMsg) { + private ListenableFuture onAlarmUpdate(AlarmUpdateMsg alarmUpdateMsg) { EntityId originatorId = getAlarmOriginator(alarmUpdateMsg.getOriginatorName(), org.thingsboard.server.common.data.EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); - if (originatorId != null) { - try { - Alarm existentAlarm = ctx.getAlarmService().findLatestByOriginatorAndType(edge.getTenantId(), originatorId, alarmUpdateMsg.getType()).get(); - switch (alarmUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - case ENTITY_UPDATED_RPC_MESSAGE: - if (existentAlarm == null || existentAlarm.getStatus().isCleared()) { - existentAlarm = new Alarm(); - existentAlarm.setTenantId(edge.getTenantId()); - existentAlarm.setType(alarmUpdateMsg.getName()); - existentAlarm.setOriginator(originatorId); - existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity())); - existentAlarm.setStartTs(alarmUpdateMsg.getStartTs()); - existentAlarm.setClearTs(alarmUpdateMsg.getClearTs()); - existentAlarm.setPropagate(alarmUpdateMsg.getPropagate()); - } - existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus())); - existentAlarm.setAckTs(alarmUpdateMsg.getAckTs()); - existentAlarm.setEndTs(alarmUpdateMsg.getEndTs()); - existentAlarm.setDetails(mapper.readTree(alarmUpdateMsg.getDetails())); - ctx.getAlarmService().createOrUpdateAlarm(existentAlarm); - break; - case ALARM_ACK_RPC_MESSAGE: - if (existentAlarm != null) { - ctx.getAlarmService().ackAlarm(edge.getTenantId(), existentAlarm.getId(), alarmUpdateMsg.getAckTs()); - } - break; - case ALARM_CLEAR_RPC_MESSAGE: - if (existentAlarm != null) { - ctx.getAlarmService().clearAlarm(edge.getTenantId(), existentAlarm.getId(), mapper.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs()); - } - break; - case ENTITY_DELETED_RPC_MESSAGE: - if (existentAlarm != null) { - ctx.getAlarmService().deleteAlarm(edge.getTenantId(), existentAlarm.getId()); - } - break; - } - } catch (Exception e) { - log.error("Error during finding existent alarm", e); + if (originatorId == null) { + return Futures.immediateFuture(null); + } + try { + Alarm existentAlarm = ctx.getAlarmService().findLatestByOriginatorAndType(edge.getTenantId(), originatorId, alarmUpdateMsg.getType()).get(); + switch (alarmUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + if (existentAlarm == null || existentAlarm.getStatus().isCleared()) { + existentAlarm = new Alarm(); + existentAlarm.setTenantId(edge.getTenantId()); + existentAlarm.setType(alarmUpdateMsg.getName()); + existentAlarm.setOriginator(originatorId); + existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity())); + existentAlarm.setStartTs(alarmUpdateMsg.getStartTs()); + existentAlarm.setClearTs(alarmUpdateMsg.getClearTs()); + existentAlarm.setPropagate(alarmUpdateMsg.getPropagate()); + } + existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus())); + existentAlarm.setAckTs(alarmUpdateMsg.getAckTs()); + existentAlarm.setEndTs(alarmUpdateMsg.getEndTs()); + existentAlarm.setDetails(mapper.readTree(alarmUpdateMsg.getDetails())); + ctx.getAlarmService().createOrUpdateAlarm(existentAlarm); + break; + case ALARM_ACK_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().ackAlarm(edge.getTenantId(), existentAlarm.getId(), alarmUpdateMsg.getAckTs()); + } + break; + case ALARM_CLEAR_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().clearAlarm(edge.getTenantId(), existentAlarm.getId(), mapper.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs()); + } + break; + case ENTITY_DELETED_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().deleteAlarm(edge.getTenantId(), existentAlarm.getId()); + } + break; } + return Futures.immediateFuture(null); + } catch (Exception e) { + log.error("Error during finding existent alarm", e); + return Futures.immediateFailedFuture(new RuntimeException("Error during finding existent alarm", e)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index 9ee5879996..4d688f6999 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -316,64 +316,62 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } @Override - public void processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) { + public ListenableFuture processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) { if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, ActionType.ADDED, ruleChainId, null); + ListenableFuture future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, ActionType.ADDED, ruleChainId, null); + return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService); } + return Futures.immediateFuture(null); } @Override - public void processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg) { + public ListenableFuture processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg) { EntityId entityId = EntityIdFactory.getByTypeAndUuid( EntityType.valueOf(attributesRequestMsg.getEntityType()), new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); final EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(entityId.getEntityType()); if (edgeEventType != null) { ListenableFuture> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SERVER_SCOPE); - Futures.addCallback(ssAttrFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List ssAttributes) { - if (ssAttributes != null && !ssAttributes.isEmpty()) { - try { - Map entityData = new HashMap<>(); - ObjectNode attributes = mapper.createObjectNode(); - for (AttributeKvEntry attr : ssAttributes) { - if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { - attributes.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { - attributes.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { - attributes.put(attr.getKey(), attr.getLongValue().get()); - } else { - attributes.put(attr.getKey(), attr.getValueAsString()); - } + return Futures.transform(ssAttrFuture, ssAttributes -> { + if (ssAttributes != null && !ssAttributes.isEmpty()) { + try { + Map entityData = new HashMap<>(); + ObjectNode attributes = mapper.createObjectNode(); + for (AttributeKvEntry attr : ssAttributes) { + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { + attributes.put(attr.getKey(), attr.getBooleanValue().get()); + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { + attributes.put(attr.getKey(), attr.getDoubleValue().get()); + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { + attributes.put(attr.getKey(), attr.getLongValue().get()); + } else { + attributes.put(attr.getKey(), attr.getValueAsString()); } - entityData.put("kv", attributes); - entityData.put("scope", DataConstants.SERVER_SCOPE); - JsonNode entityBody = mapper.valueToTree(entityData); - log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody); - saveEdgeEvent(edge.getTenantId(), - edge.getId(), - edgeEventType, - ActionType.ATTRIBUTES_UPDATED, - entityId, - entityBody); - } catch (Exception e) { - log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e); } + entityData.put("kv", attributes); + entityData.put("scope", DataConstants.SERVER_SCOPE); + JsonNode entityBody = mapper.valueToTree(entityData); + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody); + saveEdgeEvent(edge.getTenantId(), + edge.getId(), + edgeEventType, + ActionType.ATTRIBUTES_UPDATED, + entityId, + entityBody); + } catch (Exception e) { + log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e); + throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e); } } - - @Override - public void onFailure(Throwable t) { - - } + return null; }, dbCallbackExecutorService); // TODO: voba - push shared attributes to edge? - ListenableFuture> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE); - ListenableFuture> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE); + // ListenableFuture> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE); + // ListenableFuture> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE); + } else { + return Futures.immediateFuture(null); } } @@ -391,7 +389,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } @Override - public void processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) { + public ListenableFuture processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) { EntityId entityId = EntityIdFactory.getByTypeAndUuid( EntityType.valueOf(relationRequestMsg.getEntityType()), new UUID(relationRequestMsg.getEntityIdMSB(), relationRequestMsg.getEntityIdLSB())); @@ -400,39 +398,33 @@ public class DefaultSyncEdgeService implements SyncEdgeService { futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.FROM)); futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.TO)); ListenableFuture>> relationsListFuture = Futures.allAsList(futures); - Futures.addCallback(relationsListFuture, new FutureCallback>>() { - @Override - public void onSuccess(@Nullable List> relationsList) { - try { - if (!relationsList.isEmpty()) { - for (List entityRelations : relationsList) { - log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size()); - for (EntityRelation relation : entityRelations) { - try { - if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && - !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - saveEdgeEvent(edge.getTenantId(), - edge.getId(), - EdgeEventType.RELATION, - ActionType.ADDED, - null, - mapper.valueToTree(relation)); - } - } catch (Exception e) { - log.error("Exception during loading relation [{}] to edge on sync!", relation, e); + return Futures.transform(relationsListFuture, relationsList -> { + try { + if (relationsList != null && !relationsList.isEmpty()) { + for (List entityRelations : relationsList) { + log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size()); + for (EntityRelation relation : entityRelations) { + try { + if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && + !relation.getTo().getEntityType().equals(EntityType.EDGE)) { + saveEdgeEvent(edge.getTenantId(), + edge.getId(), + EdgeEventType.RELATION, + ActionType.ADDED, + null, + mapper.valueToTree(relation)); } + } catch (Exception e) { + log.error("Exception during loading relation [{}] to edge on sync!", relation, e); } } } - } catch (Exception e) { - log.error("Exception during loading relation(s) to edge on sync!", e); } + } catch (Exception e) { + log.error("Exception during loading relation(s) to edge on sync!", e); + throw new RuntimeException("Exception during loading relation(s) to edge on sync!", e); } - - @Override - public void onFailure(Throwable t) { - log.error("Exception during loading relation(s) to edge on sync!", t); - } + return null; }, dbCallbackExecutorService); } @@ -443,22 +435,26 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } @Override - public void processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) { + public ListenableFuture processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) { if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) { DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB())); - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED, deviceId, null); + ListenableFuture future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED, deviceId, null); + return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService); } + return Futures.immediateFuture(null); } @Override - public void processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) { + public ListenableFuture processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) { if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) { UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB())); - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, ActionType.CREDENTIALS_UPDATED, userId, null); + ListenableFuture future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, ActionType.CREDENTIALS_UPDATED, userId, null); + return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService); } + return Futures.immediateFuture(null); } - private void saveEdgeEvent(TenantId tenantId, + private ListenableFuture saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType edgeEventType, ActionType edgeEventAction, @@ -476,6 +472,6 @@ public class DefaultSyncEdgeService implements SyncEdgeService { edgeEvent.setEntityId(entityId.getId()); } edgeEvent.setEntityBody(entityBody); - edgeEventService.saveAsync(edgeEvent); + return edgeEventService.saveAsync(edgeEvent); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java index 051af4f6d2..a4df4e227c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.edge.rpc.init; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.gen.edge.AttributesRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; @@ -26,13 +27,13 @@ public interface SyncEdgeService { void sync(Edge edge); - void processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg); + ListenableFuture processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg); - void processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg); + ListenableFuture processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg); - void processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg); + ListenableFuture processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg); - void processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg); + ListenableFuture processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg); - void processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg); + ListenableFuture processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg); } diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 917cc3fa02..45201d3229 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -28,9 +28,9 @@ import org.thingsboard.server.gen.edge.ConnectRequestMsg; import org.thingsboard.server.gen.edge.ConnectResponseCode; import org.thingsboard.server.gen.edge.ConnectResponseMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.DownlinkResponseMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; -import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; @@ -41,6 +41,7 @@ import javax.net.ssl.SSLException; import java.io.File; import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @Service @@ -62,12 +63,13 @@ public class EdgeGrpcClient implements EdgeRpcClient { private StreamObserver inputStream; + private static final ReentrantLock uplinkMsgLock = new ReentrantLock(); + @Override public void connect(String edgeKey, String edgeSecret, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onEntityUpdate, Consumer onDownlink, Consumer onError) { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext(); @@ -83,7 +85,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { channel = builder.build(); EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel); log.info("[{}] Sending a connect request to the TB!", edgeKey); - this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onEntityUpdate, onDownlink, onError)); + this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError)); this.inputStream.onNext(RequestMsg.newBuilder() .setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE) .setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build()) @@ -110,16 +112,33 @@ public class EdgeGrpcClient implements EdgeRpcClient { @Override public void sendUplinkMsg(UplinkMsg msg) { - this.inputStream.onNext(RequestMsg.newBuilder() - .setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE) - .setUplinkMsg(msg) - .build()); + try { + uplinkMsgLock.lock(); + this.inputStream.onNext(RequestMsg.newBuilder() + .setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE) + .setUplinkMsg(msg) + .build()); + } finally { + uplinkMsgLock.unlock(); + } + } + + @Override + public void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg) { + try { + uplinkMsgLock.lock(); + this.inputStream.onNext(RequestMsg.newBuilder() + .setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE) + .setDownlinkResponseMsg(downlinkResponseMsg) + .build()); + } finally { + uplinkMsgLock.unlock(); + } } private StreamObserver initOutputStream(String edgeKey, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onEntityUpdate, Consumer onDownlink, Consumer onError) { return new StreamObserver() { @@ -137,9 +156,6 @@ public class EdgeGrpcClient implements EdgeRpcClient { } else if (responseMsg.hasUplinkResponseMsg()) { log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg()); onUplinkResponse.accept(responseMsg.getUplinkResponseMsg()); - } else if (responseMsg.hasEntityUpdateMsg()) { - log.debug("[{}] Entity update message received {}", edgeKey, responseMsg.getEntityUpdateMsg()); - onEntityUpdate.accept(responseMsg.getEntityUpdateMsg()); } else if (responseMsg.hasDownlinkMsg()) { log.debug("[{}] Downlink message received {}", edgeKey, responseMsg.getDownlinkMsg()); onDownlink.accept(responseMsg.getDownlinkMsg()); diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java index d2ab91dfaf..c268b2a9e9 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java @@ -16,8 +16,8 @@ package org.thingsboard.edge.rpc; import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.DownlinkResponseMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; -import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -29,11 +29,12 @@ public interface EdgeRpcClient { String integrationSecret, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onEntityUpdate, Consumer onDownlink, Consumer onError); void disconnect() throws InterruptedException; - void sendUplinkMsg(UplinkMsg uplinkMsg) throws InterruptedException; + void sendUplinkMsg(UplinkMsg uplinkMsg); + + void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 8be4be9956..9f5754ba31 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -37,31 +37,13 @@ message RequestMsg { RequestMsgType msgType = 1; ConnectRequestMsg connectRequestMsg = 2; UplinkMsg uplinkMsg = 3; + DownlinkResponseMsg downlinkResponseMsg = 4; } message ResponseMsg { ConnectResponseMsg connectResponseMsg = 1; UplinkResponseMsg uplinkResponseMsg = 2; - EntityUpdateMsg entityUpdateMsg = 3; - DownlinkMsg downlinkMsg = 4; -} - -message EntityUpdateMsg { - DeviceUpdateMsg deviceUpdateMsg = 1; - DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 2; - RuleChainUpdateMsg ruleChainUpdateMsg = 3; - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 4; - DashboardUpdateMsg dashboardUpdateMsg = 5; - AssetUpdateMsg assetUpdateMsg = 6; - EntityViewUpdateMsg entityViewUpdateMsg = 7; - AlarmUpdateMsg alarmUpdateMsg = 8; - UserUpdateMsg userUpdateMsg = 9; - UserCredentialsUpdateMsg userCredentialsUpdateMsg = 10; - CustomerUpdateMsg customerUpdateMsg = 11; - RelationUpdateMsg relationUpdateMsg = 12; - WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 13; - WidgetTypeUpdateMsg widgetTypeUpdateMsg = 14; - AdminSettingsUpdateMsg adminSettingsUpdateMsg = 15; + DownlinkMsg downlinkMsg = 3; } enum RequestMsgType { @@ -360,9 +342,30 @@ message UplinkResponseMsg { string errorMsg = 2; } +message DownlinkResponseMsg { + bool success = 1; + string errorMsg = 2; +} + message DownlinkMsg { int32 downlinkMsgId = 1; repeated EntityDataProto entityData = 2; repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 3; + repeated DeviceUpdateMsg deviceUpdateMsg = 4; + repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 5; + repeated RuleChainUpdateMsg ruleChainUpdateMsg = 6; + repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 7; + repeated DashboardUpdateMsg dashboardUpdateMsg = 8; + repeated AssetUpdateMsg assetUpdateMsg = 9; + repeated EntityViewUpdateMsg entityViewUpdateMsg = 10; + repeated AlarmUpdateMsg alarmUpdateMsg = 11; + repeated UserUpdateMsg userUpdateMsg = 12; + repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 13; + repeated CustomerUpdateMsg customerUpdateMsg = 14; + repeated RelationUpdateMsg relationUpdateMsg = 15; + repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 16; + repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 17; + repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 18; + }