diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index bfbf547ce5..ca3b578734 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -118,8 +118,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.error("Failed to start Edge RPC server!", e); throw new RuntimeException("Failed to start Edge RPC server!"); } - log.info("Edge RPC service initialized!"); this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); + log.info("Edge RPC service initialized!"); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index fba3e0ccac..e895442acb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.edge.rpc; import com.datastax.driver.core.utils.UUIDs; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -120,7 +119,7 @@ import java.util.function.Consumer; @Data public final class EdgeGrpcSession implements Closeable { - private static final ReentrantLock responseMsgLock = new ReentrantLock(); + private static final ReentrantLock downlinkMsgLock = new ReentrantLock(); private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; @@ -199,7 +198,7 @@ public final class EdgeGrpcSession implements Closeable { @Override public void onSuccess(@Nullable List result) { UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build(); - sendResponseMsg(ResponseMsg.newBuilder() + sendDownlinkMsg(ResponseMsg.newBuilder() .setUplinkResponseMsg(uplinkResponseMsg) .build()); } @@ -207,7 +206,7 @@ public final class EdgeGrpcSession implements Closeable { @Override public void onFailure(Throwable t) { UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build(); - sendResponseMsg(ResponseMsg.newBuilder() + sendDownlinkMsg(ResponseMsg.newBuilder() .setUplinkResponseMsg(uplinkResponseMsg) .build()); } @@ -227,35 +226,32 @@ public final class EdgeGrpcSession implements Closeable { } } - private void sendResponseMsg(ResponseMsg responseMsg) { - log.trace("[{}] Sending response msg [{}]", this.sessionId, responseMsg); + private void sendDownlinkMsg(ResponseMsg downlinkMsg) { + log.trace("[{}] Sending downlink msg [{}]", this.sessionId, downlinkMsg); if (isConnected()) { try { - responseMsgLock.lock(); - outputStream.onNext(responseMsg); + downlinkMsgLock.lock(); + outputStream.onNext(downlinkMsg); } catch (Exception e) { - log.error("[{}] Failed to send response message [{}]", this.sessionId, responseMsg, e); + log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e); connected = false; sessionCloseListener.accept(edge.getId()); } finally { - responseMsgLock.unlock(); + downlinkMsgLock.unlock(); } - log.trace("[{}] Response msg successfully sent [{}]", this.sessionId, responseMsg); + log.trace("[{}] Response msg successfully sent [{}]", this.sessionId, downlinkMsg); } } void onConfigurationUpdate(Edge edge) { log.debug("[{}] onConfigurationUpdate [{}]", this.sessionId, edge); - try { - this.edge = edge; - EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder() - .setConfiguration(constructEdgeConfigProto(edge)).build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEdgeUpdateMsg(edgeConfig) - .build()); - } catch (Exception e) { - log.error("[{}] Failed to construct proto objects!", this.sessionId, e); - } + this.edge = edge; + EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder() + .setConfiguration(constructEdgeConfigProto(edge)).build(); + ResponseMsg edgeConfigMsg = ResponseMsg.newBuilder() + .setEdgeUpdateMsg(edgeConfig) + .build(); + sendDownlinkMsg(edgeConfigMsg); } void processEdgeEvents() throws ExecutionException, InterruptedException { @@ -276,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable { latch = new CountDownLatch(downlinkMsgsPack.size()); for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { - sendResponseMsg(ResponseMsg.newBuilder() + sendDownlinkMsg(ResponseMsg.newBuilder() .setDownlinkMsg(downlinkMsg) .build()); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index 41ca2813e6..05b18e11ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -151,7 +151,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { syncWidgetsBundleAndWidgetTypes(edge); syncAdminSettings(edge); syncRuleChains(edge, new TimePageLink(DEFAULT_LIMIT)); - syncUsers(edge); + syncUsers(edge, new TextPageLink(DEFAULT_LIMIT)); syncDevices(edge, new TimePageLink(DEFAULT_LIMIT)); syncAssets(edge, new TimePageLink(DEFAULT_LIMIT)); syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT)); @@ -303,10 +303,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } - private void syncUsers(Edge edge) { + private void syncUsers(Edge edge, TextPageLink pageLink) { log.trace("[{}] syncUsers [{}]", edge.getTenantId(), edge.getName()); try { - TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT); TextPageData pageData; do { pageData = userService.findTenantAdmins(edge.getTenantId(), pageLink);