diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java index 0baa8a90a4..15ee44a66e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java @@ -623,10 +623,8 @@ public abstract class AbstractEdgeGrpcSession { + ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, RPC_CALL, + ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { downlinkMsg = convertEntityEventToDownlink(edgeEvent); if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) { log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId()); @@ -636,8 +634,7 @@ public abstract class AbstractEdgeGrpcSession downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); - default -> - log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); + default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); } } catch (Exception e) { log.error("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); @@ -763,40 +760,30 @@ public abstract class AbstractEdgeGrpcSession ctx.getEdgeProcessor().convertEdgeEventToDownlink(edgeEvent); case DEVICE -> ctx.getDeviceProcessor().convertDeviceEventToDownlink(edgeEvent, edgeVersion); - case DEVICE_PROFILE -> - ctx.getDeviceProfileProcessor().convertDeviceProfileEventToDownlink(edgeEvent, edgeVersion); - case ASSET_PROFILE -> - ctx.getAssetProfileProcessor().convertAssetProfileEventToDownlink(edgeEvent, edgeVersion); - case ASSET -> - ctx.getAssetProcessor().convertAssetEventToDownlink(edgeEvent, edgeVersion); + case DEVICE_PROFILE -> ctx.getDeviceProfileProcessor().convertDeviceProfileEventToDownlink(edgeEvent, edgeVersion); + case ASSET_PROFILE -> ctx.getAssetProfileProcessor().convertAssetProfileEventToDownlink(edgeEvent, edgeVersion); + case ASSET -> ctx.getAssetProcessor().convertAssetEventToDownlink(edgeEvent, edgeVersion); case ENTITY_VIEW -> ctx.getEntityViewProcessor().convertEntityViewEventToDownlink(edgeEvent, edgeVersion); case DASHBOARD -> ctx.getDashboardProcessor().convertDashboardEventToDownlink(edgeEvent, edgeVersion); case CUSTOMER -> ctx.getCustomerProcessor().convertCustomerEventToDownlink(edgeEvent, edgeVersion); case RULE_CHAIN -> ctx.getRuleChainProcessor().convertRuleChainEventToDownlink(edgeEvent, edgeVersion); - case RULE_CHAIN_METADATA -> - ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, edgeVersion); + case RULE_CHAIN_METADATA -> ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, edgeVersion); case ALARM -> ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, edgeVersion); case ALARM_COMMENT -> ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, edgeVersion); case USER -> ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, edgeVersion); case RELATION -> ctx.getRelationProcessor().convertRelationEventToDownlink(edgeEvent, edgeVersion); - case WIDGETS_BUNDLE -> - ctx.getWidgetBundleProcessor().convertWidgetsBundleEventToDownlink(edgeEvent, edgeVersion); + case WIDGETS_BUNDLE -> ctx.getWidgetBundleProcessor().convertWidgetsBundleEventToDownlink(edgeEvent, edgeVersion); case WIDGET_TYPE -> ctx.getWidgetTypeProcessor().convertWidgetTypeEventToDownlink(edgeEvent, edgeVersion); - case ADMIN_SETTINGS -> - ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent, edgeVersion); + case ADMIN_SETTINGS -> ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent, edgeVersion); case OTA_PACKAGE -> ctx.getOtaPackageProcessor().convertOtaPackageEventToDownlink(edgeEvent, edgeVersion); case TB_RESOURCE -> ctx.getResourceProcessor().convertResourceEventToDownlink(edgeEvent, edgeVersion); case QUEUE -> ctx.getQueueProcessor().convertQueueEventToDownlink(edgeEvent, edgeVersion); case TENANT -> ctx.getTenantProcessor().convertTenantEventToDownlink(edgeEvent, edgeVersion); - case TENANT_PROFILE -> - ctx.getTenantProfileProcessor().convertTenantProfileEventToDownlink(edgeEvent, edgeVersion); + case TENANT_PROFILE -> ctx.getTenantProfileProcessor().convertTenantProfileEventToDownlink(edgeEvent, edgeVersion); case NOTIFICATION_RULE -> ctx.getNotificationEdgeProcessor().convertNotificationRuleToDownlink(edgeEvent); - case NOTIFICATION_TARGET -> - ctx.getNotificationEdgeProcessor().convertNotificationTargetToDownlink(edgeEvent); - case NOTIFICATION_TEMPLATE -> - ctx.getNotificationEdgeProcessor().convertNotificationTemplateToDownlink(edgeEvent); - case OAUTH2_CLIENT -> - ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion); + case NOTIFICATION_TARGET -> ctx.getNotificationEdgeProcessor().convertNotificationTargetToDownlink(edgeEvent); + case NOTIFICATION_TEMPLATE -> ctx.getNotificationEdgeProcessor().convertNotificationTemplateToDownlink(edgeEvent); + case OAUTH2_CLIENT -> ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion); case DOMAIN -> ctx.getOAuth2EdgeProcessor().convertOAuth2DomainEventToDownlink(edgeEvent, edgeVersion); default -> { log.warn("[{}] Unsupported edge event type [{}]", edgeEvent.getTenantId(), edgeEvent); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 416445cc74..a82ba3efb2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -95,6 +95,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsProcessed = new ConcurrentHashMap<>(); + private final ConcurrentMap kafkaConsumerInit = new ConcurrentHashMap<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -336,6 +337,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i Boolean isChecked = edgeEventsProcessed.get(edgeId); if (Boolean.FALSE.equals(isChecked)) { scheduleEdgeEventsCheck(session); + } else { + initializeKafkaConsumer(session, tenantId, edgeId); } } if (edgeGrpcSession instanceof PostgresEdgeGrpcSession) { @@ -345,8 +348,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void initializeKafkaConsumer(KafkaEdgeGrpcSession kafkaEdgeGrpcSession, TenantId tenantId, EdgeId edgeId) { TbQueueConsumer> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId); - kafkaEdgeGrpcSession.initConsumer(() -> consumer, schedulerPoolSize); - kafkaEdgeGrpcSession.startConsumers(); + if (!kafkaConsumerInit.getOrDefault(edgeId, Boolean.FALSE)) { + kafkaEdgeGrpcSession.initConsumer(() -> consumer, schedulerPoolSize); + kafkaEdgeGrpcSession.startConsumers(); + kafkaConsumerInit.put(edgeId, Boolean.TRUE); + } } private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId, String requestServiceId) { @@ -486,6 +492,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } if (isKafkaSupported) { ((KafkaEdgeGrpcSession) toRemove).stopConsumer(); + kafkaConsumerInit.remove(edgeId); } TenantId tenantId = toRemove.getEdge().getTenantId(); save(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 920047898b..e16c89c23e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -15,12 +15,9 @@ */ package org.thingsboard.server.service.edge.rpc; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import io.grpc.stub.StreamObserver; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -40,6 +37,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -47,6 +45,7 @@ import java.util.function.Supplier; public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession { private ExecutorService consumerExecutor; + private ScheduledExecutorService highPriorityExecutorService; private QueueConsumerManager> consumer; @@ -60,14 +59,16 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession>> edgeEventsConsumer, long schedulerPoolSize) { this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); + this.highPriorityExecutorService = Executors.newScheduledThreadPool((int) schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-high-priority-scheduler")); this.consumer = QueueConsumerManager.>builder() .name("TB Edge events") .msgPackProcessor(this::processMsgs) - .pollInterval(schedulerPoolSize) + .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) .consumerCreator(edgeEventsConsumer) .consumerExecutor(consumerExecutor) .threadPrefix("edge-events") .build(); + scheduleCheckForHighPriorityEvent(); } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { @@ -83,26 +84,36 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); - Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Boolean isInterrupted) { - if (Boolean.TRUE.equals(isInterrupted)) { - log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); - } else { - consumerExecutor.submit(consumer::commit); - } + try { + boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); + if (isInterrupted) { + log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); + } else { + consumer.commit(); } - @Override - public void onFailure(Throwable t) { - log.error("[{}] Failed to send downlink msgs pack", sessionId, t); - } - }, ctx.getGrpcCallbackExecutorService()); + } catch (Exception e) { + log.error("[{}] Failed to process all downlink messages", sessionId, e); + } } } else { log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); } } + private void scheduleCheckForHighPriorityEvent() { + highPriorityExecutorService.scheduleAtFixedRate(() -> { + try { + if (isConnected() && isSyncCompleted()) { + if (!highPriorityQueue.isEmpty()) { + processHighPriorityEvents(); + } + } + } catch (Exception e) { + log.error("Error in processing high priority events", e); + } + }, 0, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval() * 3, TimeUnit.MILLISECONDS); + } + public void startConsumers() { consumer.subscribe(); consumer.launch();