diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index 341cc95d28..306d1bf5f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.service.edge.rpc.init; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.Dashboard; @@ -131,10 +133,16 @@ public class DefaultSyncEdgeService implements SyncEdgeService { futures.add(syncAssets(ctx, edge, pushedEntityIds, outputStream)); futures.add(syncEntityViews(ctx, edge, pushedEntityIds, outputStream)); futures.add(syncDashboards(ctx, edge, pushedEntityIds, outputStream)); - ListenableFuture> joinFuture = Futures.allAsList(futures); - Futures.transform(joinFuture, result -> { - syncRelations(ctx, edge, pushedEntityIds, outputStream); - return null; + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(@Nullable List result) { + syncRelations(ctx, edge, pushedEntityIds, outputStream); + } + + @Override + public void onFailure(Throwable t) { + log.warn("Exception during sync entities", t); + } }, MoreExecutors.directExecutor()); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index a8ace962bf..7e5abd9da8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -33,7 +33,7 @@ public class TbCoreConsumerStats { private final AtomicInteger claimDeviceCounter = new AtomicInteger(0); private final AtomicInteger deviceStateCounter = new AtomicInteger(0); - private final AtomicInteger edgeNotificationCounter = new AtomicInteger(0); + private final AtomicInteger edgeNotificationMsgCounter = new AtomicInteger(0); private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0); private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0); @@ -69,7 +69,7 @@ public class TbCoreConsumerStats { public void log(TransportProtos.EdgeNotificationMsgProto msg) { totalCounter.incrementAndGet(); - edgeNotificationCounter.incrementAndGet(); + edgeNotificationMsgCounter.incrementAndGet(); } public void log(TransportProtos.SubscriptionMgrMsgProto msg) { @@ -86,12 +86,13 @@ public class TbCoreConsumerStats { int total = totalCounter.getAndSet(0); if (total > 0) { log.info("Total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" + - " deviceState [{}] subMgr [{}] coreNfs [{}]", + " deviceState [{}] subMgr [{}] coreNfs [{}] edgeNfs [{}]", total, sessionEventCounter.getAndSet(0), getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0), subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0), subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0) - , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0)); + , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0), + edgeNotificationMsgCounter.getAndSet(0)); } }