Add scheduler to read hpqueue for session
This commit is contained in:
parent
78f7dd3dcd
commit
f5c0ae68a6
@ -623,10 +623,8 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
|
||||
try {
|
||||
switch (edgeEvent.getAction()) {
|
||||
case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR,
|
||||
ALARM_DELETE,
|
||||
CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, RPC_CALL,
|
||||
ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT,
|
||||
DELETED_COMMENT -> {
|
||||
ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, RPC_CALL,
|
||||
ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
|
||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||
if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) {
|
||||
log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId());
|
||||
@ -636,8 +634,7 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
|
||||
}
|
||||
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
|
||||
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
|
||||
default ->
|
||||
log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction());
|
||||
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e);
|
||||
@ -763,40 +760,30 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
|
||||
return switch (edgeEvent.getType()) {
|
||||
case EDGE -> ctx.getEdgeProcessor().convertEdgeEventToDownlink(edgeEvent);
|
||||
case DEVICE -> ctx.getDeviceProcessor().convertDeviceEventToDownlink(edgeEvent, edgeVersion);
|
||||
case DEVICE_PROFILE ->
|
||||
ctx.getDeviceProfileProcessor().convertDeviceProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ASSET_PROFILE ->
|
||||
ctx.getAssetProfileProcessor().convertAssetProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ASSET ->
|
||||
ctx.getAssetProcessor().convertAssetEventToDownlink(edgeEvent, edgeVersion);
|
||||
case DEVICE_PROFILE -> ctx.getDeviceProfileProcessor().convertDeviceProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ASSET_PROFILE -> ctx.getAssetProfileProcessor().convertAssetProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ASSET -> ctx.getAssetProcessor().convertAssetEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ENTITY_VIEW -> ctx.getEntityViewProcessor().convertEntityViewEventToDownlink(edgeEvent, edgeVersion);
|
||||
case DASHBOARD -> ctx.getDashboardProcessor().convertDashboardEventToDownlink(edgeEvent, edgeVersion);
|
||||
case CUSTOMER -> ctx.getCustomerProcessor().convertCustomerEventToDownlink(edgeEvent, edgeVersion);
|
||||
case RULE_CHAIN -> ctx.getRuleChainProcessor().convertRuleChainEventToDownlink(edgeEvent, edgeVersion);
|
||||
case RULE_CHAIN_METADATA ->
|
||||
ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, edgeVersion);
|
||||
case RULE_CHAIN_METADATA -> ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ALARM -> ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ALARM_COMMENT -> ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, edgeVersion);
|
||||
case USER -> ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, edgeVersion);
|
||||
case RELATION -> ctx.getRelationProcessor().convertRelationEventToDownlink(edgeEvent, edgeVersion);
|
||||
case WIDGETS_BUNDLE ->
|
||||
ctx.getWidgetBundleProcessor().convertWidgetsBundleEventToDownlink(edgeEvent, edgeVersion);
|
||||
case WIDGETS_BUNDLE -> ctx.getWidgetBundleProcessor().convertWidgetsBundleEventToDownlink(edgeEvent, edgeVersion);
|
||||
case WIDGET_TYPE -> ctx.getWidgetTypeProcessor().convertWidgetTypeEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ADMIN_SETTINGS ->
|
||||
ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent, edgeVersion);
|
||||
case ADMIN_SETTINGS -> ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent, edgeVersion);
|
||||
case OTA_PACKAGE -> ctx.getOtaPackageProcessor().convertOtaPackageEventToDownlink(edgeEvent, edgeVersion);
|
||||
case TB_RESOURCE -> ctx.getResourceProcessor().convertResourceEventToDownlink(edgeEvent, edgeVersion);
|
||||
case QUEUE -> ctx.getQueueProcessor().convertQueueEventToDownlink(edgeEvent, edgeVersion);
|
||||
case TENANT -> ctx.getTenantProcessor().convertTenantEventToDownlink(edgeEvent, edgeVersion);
|
||||
case TENANT_PROFILE ->
|
||||
ctx.getTenantProfileProcessor().convertTenantProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case TENANT_PROFILE -> ctx.getTenantProfileProcessor().convertTenantProfileEventToDownlink(edgeEvent, edgeVersion);
|
||||
case NOTIFICATION_RULE -> ctx.getNotificationEdgeProcessor().convertNotificationRuleToDownlink(edgeEvent);
|
||||
case NOTIFICATION_TARGET ->
|
||||
ctx.getNotificationEdgeProcessor().convertNotificationTargetToDownlink(edgeEvent);
|
||||
case NOTIFICATION_TEMPLATE ->
|
||||
ctx.getNotificationEdgeProcessor().convertNotificationTemplateToDownlink(edgeEvent);
|
||||
case OAUTH2_CLIENT ->
|
||||
ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion);
|
||||
case NOTIFICATION_TARGET -> ctx.getNotificationEdgeProcessor().convertNotificationTargetToDownlink(edgeEvent);
|
||||
case NOTIFICATION_TEMPLATE -> ctx.getNotificationEdgeProcessor().convertNotificationTemplateToDownlink(edgeEvent);
|
||||
case OAUTH2_CLIENT -> ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion);
|
||||
case DOMAIN -> ctx.getOAuth2EdgeProcessor().convertOAuth2DomainEventToDownlink(edgeEvent, edgeVersion);
|
||||
default -> {
|
||||
log.warn("[{}] Unsupported edge event type [{}]", edgeEvent.getTenantId(), edgeEvent);
|
||||
|
||||
@ -95,6 +95,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EdgeId, Boolean> edgeEventsProcessed = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EdgeId, Boolean> kafkaConsumerInit = new ConcurrentHashMap<>();
|
||||
|
||||
@Value("${edges.rpc.port}")
|
||||
private int rpcPort;
|
||||
@ -336,6 +337,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
Boolean isChecked = edgeEventsProcessed.get(edgeId);
|
||||
if (Boolean.FALSE.equals(isChecked)) {
|
||||
scheduleEdgeEventsCheck(session);
|
||||
} else {
|
||||
initializeKafkaConsumer(session, tenantId, edgeId);
|
||||
}
|
||||
}
|
||||
if (edgeGrpcSession instanceof PostgresEdgeGrpcSession) {
|
||||
@ -345,8 +348,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
|
||||
private void initializeKafkaConsumer(KafkaEdgeGrpcSession kafkaEdgeGrpcSession, TenantId tenantId, EdgeId edgeId) {
|
||||
TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId);
|
||||
if (!kafkaConsumerInit.getOrDefault(edgeId, Boolean.FALSE)) {
|
||||
kafkaEdgeGrpcSession.initConsumer(() -> consumer, schedulerPoolSize);
|
||||
kafkaEdgeGrpcSession.startConsumers();
|
||||
kafkaConsumerInit.put(edgeId, Boolean.TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId, String requestServiceId) {
|
||||
@ -486,6 +492,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
}
|
||||
if (isKafkaSupported) {
|
||||
((KafkaEdgeGrpcSession) toRemove).stopConsumer();
|
||||
kafkaConsumerInit.remove(edgeId);
|
||||
}
|
||||
TenantId tenantId = toRemove.getEdge().getTenantId();
|
||||
save(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
|
||||
|
||||
@ -15,12 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
@ -40,6 +37,7 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ -47,6 +45,7 @@ import java.util.function.Supplier;
|
||||
public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession<KafkaEdgeGrpcSession> {
|
||||
|
||||
private ExecutorService consumerExecutor;
|
||||
private ScheduledExecutorService highPriorityExecutorService;
|
||||
|
||||
private QueueConsumerManager<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer;
|
||||
|
||||
@ -60,14 +59,16 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession<KafkaEdgeGrpcS
|
||||
|
||||
protected void initConsumer(Supplier<TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>> edgeEventsConsumer, long schedulerPoolSize) {
|
||||
this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer"));
|
||||
this.highPriorityExecutorService = Executors.newScheduledThreadPool((int) schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-high-priority-scheduler"));
|
||||
this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder()
|
||||
.name("TB Edge events")
|
||||
.msgPackProcessor(this::processMsgs)
|
||||
.pollInterval(schedulerPoolSize)
|
||||
.pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval())
|
||||
.consumerCreator(edgeEventsConsumer)
|
||||
.consumerExecutor(consumerExecutor)
|
||||
.threadPrefix("edge-events")
|
||||
.build();
|
||||
scheduleCheckForHighPriorityEvent();
|
||||
}
|
||||
|
||||
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
|
||||
@ -83,26 +84,36 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession<KafkaEdgeGrpcS
|
||||
edgeEvents.add(edgeEvent);
|
||||
}
|
||||
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
|
||||
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Boolean isInterrupted) {
|
||||
if (Boolean.TRUE.equals(isInterrupted)) {
|
||||
try {
|
||||
boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
|
||||
if (isInterrupted) {
|
||||
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId);
|
||||
} else {
|
||||
consumerExecutor.submit(consumer::commit);
|
||||
consumer.commit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Failed to process all downlink messages", sessionId, e);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("[{}] Failed to send downlink msgs pack", sessionId, t);
|
||||
}
|
||||
}, ctx.getGrpcCallbackExecutorService());
|
||||
}
|
||||
} else {
|
||||
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleCheckForHighPriorityEvent() {
|
||||
highPriorityExecutorService.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
if (isConnected() && isSyncCompleted()) {
|
||||
if (!highPriorityQueue.isEmpty()) {
|
||||
processHighPriorityEvents();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error in processing high priority events", e);
|
||||
}
|
||||
}, 0, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval() * 3, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void startConsumers() {
|
||||
consumer.subscribe();
|
||||
consumer.launch();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user