diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 0f1c5e5b71..7eaca9747e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -441,27 +441,41 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - Futures.transform(alarmFuture, alarm -> { - if (alarm != null) { - EdgeEventType type = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type != null) { - ListenableFuture> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); - Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { - if (relatedEdgeIdsByEntityId != null) { - for (EdgeId edgeId : relatedEdgeIdsByEntityId) { - saveEdgeEvent(tenantId, - edgeId, - EdgeEventType.ALARM, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - alarmId, - null); + Futures.addCallback(alarmFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Alarm alarm) { + if (alarm != null) { + EdgeEventType type = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type != null) { + ListenableFuture> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); + Futures.addCallback(relatedEdgeIdsByEntityIdFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List relatedEdgeIdsByEntityId) { + if (relatedEdgeIdsByEntityId != null) { + for (EdgeId edgeId : relatedEdgeIdsByEntityId) { + saveEdgeEvent(tenantId, + edgeId, + EdgeEventType.ALARM, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + alarmId, + null); + } + } } - } - return null; - }, dbCallbackExecutorService); + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] can't find related edge ids by entity id [{}]", tenantId.getId(), alarm.getOriginator(), t); + } + }, dbCallbackExecutorService); + } } } - return null; + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] can't find alarm by id [{}]", tenantId.getId(), alarmId.getId(), t); + } }, dbCallbackExecutorService); } @@ -473,26 +487,34 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo())); futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom())); ListenableFuture>> combinedFuture = Futures.allAsList(futures); - Futures.transform(combinedFuture, listOfListsEdgeIds -> { - Set uniqueEdgeIds = new HashSet<>(); - if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) { - for (List listOfListsEdgeId : listOfListsEdgeIds) { - if (listOfListsEdgeId != null) { - uniqueEdgeIds.addAll(listOfListsEdgeId); + Futures.addCallback(combinedFuture, new FutureCallback>>() { + @Override + public void onSuccess(@Nullable List> listOfListsEdgeIds) { + Set uniqueEdgeIds = new HashSet<>(); + if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) { + for (List listOfListsEdgeId : listOfListsEdgeIds) { + if (listOfListsEdgeId != null) { + uniqueEdgeIds.addAll(listOfListsEdgeId); + } + } + } + if (!uniqueEdgeIds.isEmpty()) { + for (EdgeId edgeId : uniqueEdgeIds) { + saveEdgeEvent(tenantId, + edgeId, + EdgeEventType.RELATION, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + null, + mapper.valueToTree(relation)); } } } - if (!uniqueEdgeIds.isEmpty()) { - for (EdgeId edgeId : uniqueEdgeIds) { - saveEdgeEvent(tenantId, - edgeId, - EdgeEventType.RELATION, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - null, - mapper.valueToTree(relation)); - } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] can't find related edge ids by relation to id [{}] and relation from id [{}]" , + tenantId.getId(), relation.getTo().getId(), relation.getFrom().getId(), t); } - return null; }, dbCallbackExecutorService); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 20eaf7c72c..bfbf547ce5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; @@ -48,10 +49,12 @@ import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Service @@ -62,6 +65,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final ConcurrentMap sessionNewEvents = new ConcurrentHashMap<>(); + private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private static final ObjectMapper mapper = new ObjectMapper(); @Value("${edges.rpc.port}") @@ -77,6 +81,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Value("${edges.rpc.client_max_keep_alive_time_sec}") private int clientMaxKeepAliveTimeSec; + @Value("${edges.scheduler_pool_size}") + private int schedulerPoolSize; + @Autowired private EdgeContextComponent ctx; @@ -85,7 +92,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private Server server; - private ExecutorService executor; + private ScheduledExecutorService scheduler; @PostConstruct public void init() { @@ -112,8 +119,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i throw new RuntimeException("Failed to start Edge RPC server!"); } log.info("Edge RPC service initialized!"); - executor = Executors.newSingleThreadExecutor(); - processHandleMessages(); + this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); } @PreDestroy @@ -121,8 +127,16 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (server != null) { server.shutdownNow(); } - if (executor != null) { - executor.shutdownNow(); + for (Map.Entry> entry : sessionEdgeEventChecks.entrySet()) { + EdgeId edgeId = entry.getKey(); + ScheduledFuture sessionEdgeEventCheck = entry.getValue(); + if (sessionEdgeEventCheck != null && !sessionEdgeEventCheck.isCancelled() && !sessionEdgeEventCheck.isDone()) { + sessionEdgeEventCheck.cancel(true); + sessionEdgeEventChecks.remove(edgeId); + } + } + if (scheduler != null) { + scheduler.shutdownNow(); } } @@ -150,6 +164,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i session.close(); sessions.remove(edgeId); sessionNewEvents.remove(edgeId); + cancelScheduleEdgeEventsCheck(edgeId); } } @@ -168,6 +183,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.put(edgeId, false); save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true); save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis()); + scheduleEdgeEventsCheck(edgeGrpcSession); } public EdgeGrpcSession getEdgeGrpcSessionById(EdgeId edgeId) { @@ -179,38 +195,38 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void processHandleMessages() { - executor.submit(() -> { - while (!Thread.interrupted()) { + private void scheduleEdgeEventsCheck(EdgeGrpcSession session) { + EdgeId edgeId = session.getEdge().getId(); + UUID tenantId = session.getEdge().getTenantId().getId(); + if (sessions.containsKey(edgeId)) { + ScheduledFuture schedule = scheduler.schedule(() -> { try { - if (sessions.size() > 0) { - for (Map.Entry entry : sessions.entrySet()) { - EdgeId edgeId = entry.getKey(); - EdgeGrpcSession session = entry.getValue(); - if (sessionNewEvents.get(edgeId)) { - log.trace("[{}] set session new events flag to false", edgeId.getId()); - sessionNewEvents.put(edgeId, false); - // TODO: voba - at the moment all edge events are processed in a single thread. Maybe this should be updated? - session.processHandleMessages(); - } - } - } else { - log.trace("No sessions available"); - } - log.trace("Sleep for the next run"); - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); - } catch (InterruptedException ignore) { + if (sessionNewEvents.get(edgeId)) { + log.trace("[{}] Set session new events flag to false", edgeId.getId()); + sessionNewEvents.put(edgeId, false); + session.processEdgeEvents(); } } catch (Exception e) { - log.warn("Failed to process messages handling!", e); - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } + log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), e); } + scheduleEdgeEventsCheck(session); + }, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS); + sessionEdgeEventChecks.put(edgeId, schedule); + log.trace("[{}] Check edge event was scheduler for edge [{}]", tenantId, edgeId.getId()); + } else { + log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]", + tenantId, edgeId.getId()); + } + } + + private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) { + if (sessionEdgeEventChecks.containsKey(edgeId)) { + ScheduledFuture sessionEdgeEventCheck = sessionEdgeEventChecks.get(edgeId); + if (sessionEdgeEventCheck != null && !sessionEdgeEventCheck.isCancelled() && !sessionEdgeEventCheck.isDone()) { + sessionEdgeEventCheck.cancel(true); + sessionEdgeEventChecks.remove(edgeId); } - }); + } } private void onEdgeDisconnect(EdgeId edgeId) { @@ -219,6 +235,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.remove(edgeId); save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis()); + cancelScheduleEdgeEventsCheck(edgeId); } private void save(EdgeId edgeId, String key, long value) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2c1c2c879c..fba3e0ccac 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -258,7 +258,7 @@ public final class EdgeGrpcSession implements Closeable { } } - void processHandleMessages() throws ExecutionException, InterruptedException { + void processEdgeEvents() throws ExecutionException, InterruptedException { log.trace("[{}] processHandleMessages started", this.sessionId); if (isConnected()) { Long queueStartTs = getQueueStartTs().get(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index 5cd5dafefb..41ca2813e6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -545,34 +545,47 @@ public class DefaultSyncEdgeService implements SyncEdgeService { futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.FROM)); futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.TO)); ListenableFuture>> relationsListFuture = Futures.allAsList(futures); - return Futures.transform(relationsListFuture, relationsList -> { - try { - if (relationsList != null && !relationsList.isEmpty()) { - for (List entityRelations : relationsList) { - log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size()); - for (EntityRelation relation : entityRelations) { - try { - if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && - !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - saveEdgeEvent(edge.getTenantId(), - edge.getId(), - EdgeEventType.RELATION, - EdgeEventActionType.ADDED, - null, - mapper.valueToTree(relation)); + SettableFuture futureToSet = SettableFuture.create(); + Futures.addCallback(relationsListFuture, new FutureCallback>>() { + @Override + public void onSuccess(@Nullable List> relationsList) { + try { + if (relationsList != null && !relationsList.isEmpty()) { + for (List entityRelations : relationsList) { + log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size()); + for (EntityRelation relation : entityRelations) { + try { + if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && + !relation.getTo().getEntityType().equals(EntityType.EDGE)) { + saveEdgeEvent(edge.getTenantId(), + edge.getId(), + EdgeEventType.RELATION, + EdgeEventActionType.ADDED, + null, + mapper.valueToTree(relation)); + } + } catch (Exception e) { + log.error("Exception during loading relation [{}] to edge on sync!", relation, e); + futureToSet.setException(e); + return; + } + } } - } catch (Exception e) { - log.error("Exception during loading relation [{}] to edge on sync!", relation, e); } + futureToSet.set(null); + } catch (Exception e) { + log.error("Exception during loading relation(s) to edge on sync!", e); + futureToSet.setException(e); } } - } - } catch (Exception e) { - log.error("Exception during loading relation(s) to edge on sync!", e); - throw new RuntimeException("Exception during loading relation(s) to edge on sync!", e); - } - return null; - }, dbCallbackExecutorService); + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't find relation by query. Entity id [{}]", edge.getTenantId(), entityId, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); + return futureToSet; } private ListenableFuture> findRelationByQuery(Edge edge, EntityId entityId, EntitySearchDirection direction) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 85bea384e5..11c14aa3ba 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -602,6 +602,7 @@ edges: max_read_records_count: "${EDGES_RPC_STORAGE_MAX_READ_RECORDS_COUNT:50}" no_read_records_sleep: "${EDGES_RPC_NO_READ_RECORDS_SLEEP:1000}" sleep_between_batches: "${EDGES_RPC_SLEEP_BETWEEN_BATCHES:1000}" + scheduler_pool_size: "${EDGES_SCHEDULER_POOL_SIZE:4}" edge_events_ttl: "${EDGES_EDGE_EVENTS_TTL:0}" state: persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index a1ea2296a6..58e8b7a8cd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.edge; import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -324,13 +325,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) { log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId); ListenableFuture> future = ruleChainService.findDefaultEdgeRuleChainsByTenantId(tenantId); - Futures.transform(future, ruleChains -> { - if (ruleChains != null && !ruleChains.isEmpty()) { - for (RuleChain ruleChain : ruleChains) { - ruleChainService.assignRuleChainToEdge(tenantId, ruleChain.getId(), edgeId); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(List ruleChains) { + if (ruleChains != null && !ruleChains.isEmpty()) { + for (RuleChain ruleChain : ruleChains) { + ruleChainService.assignRuleChainToEdge(tenantId, ruleChain.getId(), edgeId); + } } } - return null; + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] can't find default edge rule chains [{}]", tenantId.getId(), edgeId.getId(), t); + } }, MoreExecutors.directExecutor()); }