Merge pull request #13487 from volodymyr-babak/edge-kafka-session-improvements
KafkaEdgeGrpcSession - Stability Improvements During Core Service Restarts or Network Timeouts
This commit is contained in:
		
						commit
						7a169178bc
					
				@ -69,7 +69,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.io.InputStream;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -193,6 +195,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
			
		||||
        this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
 | 
			
		||||
        this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
 | 
			
		||||
        this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
 | 
			
		||||
        this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
 | 
			
		||||
        log.info("Edge RPC service initialized!");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -262,6 +265,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateEdge(TenantId tenantId, Edge edge) {
 | 
			
		||||
        if (edge == null) {
 | 
			
		||||
            log.warn("[{}] Edge is null - edge is removed and outdated notification is in process!", tenantId);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        EdgeGrpcSession session = sessions.get(edge.getId());
 | 
			
		||||
        if (session != null && session.isConnected()) {
 | 
			
		||||
            log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
 | 
			
		||||
@ -459,11 +466,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
			
		||||
    private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception {
 | 
			
		||||
        boolean isMigrationProcessed = edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE);
 | 
			
		||||
        if (!isMigrationProcessed) {
 | 
			
		||||
            log.info("Starting edge event migration for edge [{}]", edgeId.getId());
 | 
			
		||||
            Boolean eventsExist = session.migrateEdgeEvents().get();
 | 
			
		||||
            if (Boolean.TRUE.equals(eventsExist)) {
 | 
			
		||||
                log.info("Migration still in progress for edge [{}]", edgeId.getId());
 | 
			
		||||
                sessionNewEvents.put(edgeId, true);
 | 
			
		||||
                scheduleEdgeEventsCheck(session);
 | 
			
		||||
            } else if (Boolean.FALSE.equals(eventsExist)) {
 | 
			
		||||
                log.info("Migration completed for edge [{}]", edgeId.getId());
 | 
			
		||||
                edgeEventsMigrationProcessed.put(edgeId, true);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -610,4 +620,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void destroyKafkaSessionIfDisconnectedAndConsumerActive() {
 | 
			
		||||
        try {
 | 
			
		||||
            List<EdgeId> toRemove = new ArrayList<>();
 | 
			
		||||
            for (EdgeGrpcSession session : sessions.values()) {
 | 
			
		||||
                if (session instanceof KafkaEdgeGrpcSession kafkaSession &&
 | 
			
		||||
                        !kafkaSession.isConnected() &&
 | 
			
		||||
                        kafkaSession.getConsumer() != null &&
 | 
			
		||||
                        kafkaSession.getConsumer().getConsumer() != null &&
 | 
			
		||||
                        !kafkaSession.getConsumer().getConsumer().isStopped()) {
 | 
			
		||||
                    toRemove.add(kafkaSession.getEdge().getId());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            for (EdgeId edgeId : toRemove) {
 | 
			
		||||
                log.info("[{}] Destroying session for edge because edge is not connected", edgeId);
 | 
			
		||||
                EdgeGrpcSession removed = sessions.remove(edgeId);
 | 
			
		||||
                if (removed instanceof KafkaEdgeGrpcSession kafkaSession) {
 | 
			
		||||
                    kafkaSession.destroy();
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("Failed to cleanup kafka sessions", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -42,7 +42,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.SortOrder;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
@ -292,11 +291,11 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
 | 
			
		||||
    protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
 | 
			
		||||
        try {
 | 
			
		||||
            log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink);
 | 
			
		||||
            log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink);
 | 
			
		||||
            processHighPriorityEvents();
 | 
			
		||||
            PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
 | 
			
		||||
            if (isConnected() && !pageData.getData().isEmpty()) {
 | 
			
		||||
                log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, sessionId, pageData.getData().size());
 | 
			
		||||
                log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size());
 | 
			
		||||
                List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
 | 
			
		||||
                Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
 | 
			
		||||
                    @Override
 | 
			
		||||
@ -323,16 +322,16 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
 | 
			
		||||
                    @Override
 | 
			
		||||
                    public void onFailure(Throwable t) {
 | 
			
		||||
                        log.error("[{}] Failed to send downlink msgs pack", sessionId, t);
 | 
			
		||||
                        log.error("[{}] Failed to send downlink msgs pack", edge.getId(), t);
 | 
			
		||||
                        result.setException(t);
 | 
			
		||||
                    }
 | 
			
		||||
                }, ctx.getGrpcCallbackExecutorService());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink);
 | 
			
		||||
                log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink);
 | 
			
		||||
                result.set(null);
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] Failed to fetch edge events", sessionId, e);
 | 
			
		||||
            log.error("[{}] Failed to fetch edge events", edge.getId(), e);
 | 
			
		||||
            result.setException(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -459,9 +458,9 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                            ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
 | 
			
		||||
                                    .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build());
 | 
			
		||||
                        }
 | 
			
		||||
                        log.warn("[{}][{}] {}, attempt: {}", tenantId, sessionId, failureMsg, attempt);
 | 
			
		||||
                        log.warn("[{}][{}] {}, attempt: {}", tenantId, edge.getId(), failureMsg, attempt);
 | 
			
		||||
                    }
 | 
			
		||||
                    log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, sessionId, copy.size());
 | 
			
		||||
                    log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, edge.getId(), copy.size());
 | 
			
		||||
                    for (DownlinkMsg downlinkMsg : copy) {
 | 
			
		||||
                        if (clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > clientMaxInboundMessageSize) {
 | 
			
		||||
                            String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " +
 | 
			
		||||
@ -483,7 +482,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                    } else {
 | 
			
		||||
                        String failureMsg = String.format("Failed to deliver messages: %s", copy);
 | 
			
		||||
                        log.warn("[{}][{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
 | 
			
		||||
                                tenantId, sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
 | 
			
		||||
                                tenantId, edge.getId(), MAX_DOWNLINK_ATTEMPTS, copy);
 | 
			
		||||
                        ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
 | 
			
		||||
                                .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg)
 | 
			
		||||
                                .error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build());
 | 
			
		||||
@ -493,7 +492,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                    stopCurrentSendDownlinkMsgsTask(false);
 | 
			
		||||
                }
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, sessionId, e.getMessage(), e);
 | 
			
		||||
                log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, edge.getId(), e.getMessage(), e);
 | 
			
		||||
                stopCurrentSendDownlinkMsgsTask(true);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
@ -540,7 +539,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                stopCurrentSendDownlinkMsgsTask(false);
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, sessionId, msg, e);
 | 
			
		||||
            log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, edge.getId(), msg, e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -555,12 +554,12 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                while ((event = highPriorityQueue.poll()) != null) {
 | 
			
		||||
                    highPriorityEvents.add(event);
 | 
			
		||||
                }
 | 
			
		||||
                log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size());
 | 
			
		||||
                log.trace("[{}][{}] Sending high priority events {}", tenantId, edge.getId(), highPriorityEvents.size());
 | 
			
		||||
                List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents);
 | 
			
		||||
                sendDownlinkMsgsPack(downlinkMsgsPack).get();
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] Failed to process high priority events", sessionId, e);
 | 
			
		||||
            log.error("[{}] Failed to process high priority events", edge.getId(), e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -577,7 +576,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                    Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
 | 
			
		||||
                    ctx.getEdgeEventService());
 | 
			
		||||
            log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
 | 
			
		||||
                    tenantId, sessionId, previousStartTs, previousStartSeqId);
 | 
			
		||||
                    tenantId, edge.getId(), previousStartTs, previousStartSeqId);
 | 
			
		||||
            Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
 | 
			
		||||
@ -586,7 +585,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                        Futures.addCallback(updateFuture, new FutureCallback<>() {
 | 
			
		||||
                            @Override
 | 
			
		||||
                            public void onSuccess(@Nullable List<Long> list) {
 | 
			
		||||
                                log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
 | 
			
		||||
                                log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId);
 | 
			
		||||
                                boolean newEventsAvailable;
 | 
			
		||||
                                if (fetcher.isSeqIdNewCycleStarted()) {
 | 
			
		||||
                                    newEventsAvailable = isNewEdgeEventsAvailable();
 | 
			
		||||
@ -601,28 +600,28 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
 | 
			
		||||
                            @Override
 | 
			
		||||
                            public void onFailure(Throwable t) {
 | 
			
		||||
                                log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t);
 | 
			
		||||
                                log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, edge.getId(), newStartTsAndSeqId, t);
 | 
			
		||||
                                result.setException(t);
 | 
			
		||||
                            }
 | 
			
		||||
                        }, ctx.getGrpcCallbackExecutorService());
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId);
 | 
			
		||||
                        log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, edge.getId());
 | 
			
		||||
                        result.set(Boolean.FALSE);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    log.error("[{}][{}] Failed to process events", tenantId, sessionId, t);
 | 
			
		||||
                    log.error("[{}][{}] Failed to process events", tenantId, edge.getId(), t);
 | 
			
		||||
                    result.setException(t);
 | 
			
		||||
                }
 | 
			
		||||
            }, ctx.getGrpcCallbackExecutorService());
 | 
			
		||||
        } else {
 | 
			
		||||
            if (isSyncInProgress()) {
 | 
			
		||||
                log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId);
 | 
			
		||||
                log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, edge.getId());
 | 
			
		||||
                result.set(Boolean.TRUE);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, sessionId);
 | 
			
		||||
                log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, edge.getId());
 | 
			
		||||
                result.set(null);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -632,7 +631,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
    protected List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
 | 
			
		||||
        List<DownlinkMsg> result = new ArrayList<>();
 | 
			
		||||
        for (EdgeEvent edgeEvent : edgeEvents) {
 | 
			
		||||
            log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, sessionId, edgeEvent);
 | 
			
		||||
            log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, edge.getId(), edgeEvent);
 | 
			
		||||
            DownlinkMsg downlinkMsg = null;
 | 
			
		||||
            try {
 | 
			
		||||
                switch (edgeEvent.getAction()) {
 | 
			
		||||
@ -641,17 +640,17 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                         ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
 | 
			
		||||
                        downlinkMsg = convertEntityEventToDownlink(edgeEvent);
 | 
			
		||||
                        if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) {
 | 
			
		||||
                            log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId());
 | 
			
		||||
                            log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, edge.getId(), downlinkMsg.getDownlinkMsgId());
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg);
 | 
			
		||||
                            log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
 | 
			
		||||
                            downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
 | 
			
		||||
                    default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction());
 | 
			
		||||
                    default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction());
 | 
			
		||||
                }
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e);
 | 
			
		||||
                log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, edge.getId(), e);
 | 
			
		||||
            }
 | 
			
		||||
            if (downlinkMsg != null) {
 | 
			
		||||
                result.add(downlinkMsg);
 | 
			
		||||
@ -757,19 +756,19 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
    private void sendDownlinkMsg(ResponseMsg responseMsg) {
 | 
			
		||||
        if (isConnected()) {
 | 
			
		||||
            String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000);
 | 
			
		||||
            log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr);
 | 
			
		||||
            log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, edge.getId(), responseMsgStr);
 | 
			
		||||
            downlinkMsgLock.lock();
 | 
			
		||||
            String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr;
 | 
			
		||||
            try {
 | 
			
		||||
                outputStream.onNext(responseMsg);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e);
 | 
			
		||||
                log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, edge.getId(), downlinkMsgStr, e);
 | 
			
		||||
                connected = false;
 | 
			
		||||
                sessionCloseListener.accept(edge, sessionId);
 | 
			
		||||
            } finally {
 | 
			
		||||
                downlinkMsgLock.unlock();
 | 
			
		||||
            }
 | 
			
		||||
            log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr);
 | 
			
		||||
            log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, edge.getId(), downlinkMsgStr);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -909,8 +908,8 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
 | 
			
		||||
            log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e);
 | 
			
		||||
            ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId())
 | 
			
		||||
            log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e);
 | 
			
		||||
            ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
 | 
			
		||||
                    .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build());
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import io.grpc.stub.StreamObserver;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.Edge;
 | 
			
		||||
@ -56,6 +57,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
 | 
			
		||||
 | 
			
		||||
    private volatile boolean isHighPriorityProcessing;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private QueueConsumerManager<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer;
 | 
			
		||||
 | 
			
		||||
    private ExecutorService consumerExecutor;
 | 
			
		||||
@ -72,31 +74,28 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
 | 
			
		||||
        log.trace("[{}][{}] starting processing edge events", tenantId, sessionId);
 | 
			
		||||
        if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) {
 | 
			
		||||
            List<EdgeEvent> edgeEvents = new ArrayList<>();
 | 
			
		||||
            for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) {
 | 
			
		||||
                EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg());
 | 
			
		||||
                edgeEvents.add(edgeEvent);
 | 
			
		||||
        log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId());
 | 
			
		||||
        if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) {
 | 
			
		||||
            log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " +
 | 
			
		||||
                            "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
 | 
			
		||||
                    tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        List<EdgeEvent> edgeEvents = new ArrayList<>();
 | 
			
		||||
        for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) {
 | 
			
		||||
            EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg());
 | 
			
		||||
            edgeEvents.add(edgeEvent);
 | 
			
		||||
        }
 | 
			
		||||
        List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
 | 
			
		||||
        try {
 | 
			
		||||
            boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
 | 
			
		||||
            if (isInterrupted) {
 | 
			
		||||
                log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId());
 | 
			
		||||
            } else {
 | 
			
		||||
                consumer.commit();
 | 
			
		||||
            }
 | 
			
		||||
            List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
 | 
			
		||||
            try {
 | 
			
		||||
                boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
 | 
			
		||||
                if (isInterrupted) {
 | 
			
		||||
                    log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId);
 | 
			
		||||
                } else {
 | 
			
		||||
                    consumer.commit();
 | 
			
		||||
                }
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("[{}] Failed to process all downlink messages", sessionId, e);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
                Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
 | 
			
		||||
            } catch (InterruptedException interruptedException) {
 | 
			
		||||
                log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException);
 | 
			
		||||
            }
 | 
			
		||||
            log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -107,18 +106,23 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> processEdgeEvents() {
 | 
			
		||||
        if (consumer == null) {
 | 
			
		||||
            this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer"));
 | 
			
		||||
            this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder()
 | 
			
		||||
                    .name("TB Edge events")
 | 
			
		||||
                    .msgPackProcessor(this::processMsgs)
 | 
			
		||||
                    .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval())
 | 
			
		||||
                    .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId()))
 | 
			
		||||
                    .consumerExecutor(consumerExecutor)
 | 
			
		||||
                    .threadPrefix("edge-events")
 | 
			
		||||
                    .build();
 | 
			
		||||
            consumer.subscribe();
 | 
			
		||||
            consumer.launch();
 | 
			
		||||
        if (consumer == null || (consumer.getConsumer() != null && consumer.getConsumer().isStopped())) {
 | 
			
		||||
            try {
 | 
			
		||||
                this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer"));
 | 
			
		||||
                this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder()
 | 
			
		||||
                        .name("TB Edge events [" + edge.getId() + "]")
 | 
			
		||||
                        .msgPackProcessor(this::processMsgs)
 | 
			
		||||
                        .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval())
 | 
			
		||||
                        .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId()))
 | 
			
		||||
                        .consumerExecutor(consumerExecutor)
 | 
			
		||||
                        .threadPrefix("edge-events-" + edge.getId())
 | 
			
		||||
                        .build();
 | 
			
		||||
                consumer.subscribe();
 | 
			
		||||
                consumer.launch();
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                destroy();
 | 
			
		||||
                log.warn("[{}][{}] Failed to start edge event consumer", sessionId, edge.getId(), e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.immediateFuture(Boolean.FALSE);
 | 
			
		||||
    }
 | 
			
		||||
@ -132,8 +136,18 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        consumer.stop();
 | 
			
		||||
        consumerExecutor.shutdown();
 | 
			
		||||
        try {
 | 
			
		||||
            if (consumer != null) {
 | 
			
		||||
                consumer.stop();
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            consumer = null;
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            if (consumerExecutor != null) {
 | 
			
		||||
                consumerExecutor.shutdown();
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception ignored) {}
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user