diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 73da3694f9..9b70c9bd72 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -405,6 +405,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i EdgeId edgeId = session.getEdge().getId(); TenantId tenantId = session.getEdge().getTenantId(); + destroyKafkaSessionIfDisconnectedAndConsumerActive(tenantId, edgeId, session); + cancelScheduleEdgeEventsCheck(edgeId); if (sessions.containsKey(edgeId)) { @@ -459,16 +461,35 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception { boolean isMigrationProcessed = edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE); if (!isMigrationProcessed) { + log.info("Starting edge event migration for edge [{}]", edgeId.getId()); Boolean eventsExist = session.migrateEdgeEvents().get(); if (Boolean.TRUE.equals(eventsExist)) { + log.info("Migration still in progress for edge [{}]", edgeId.getId()); sessionNewEvents.put(edgeId, true); scheduleEdgeEventsCheck(session); } else if (Boolean.FALSE.equals(eventsExist)) { + log.info("Migration completed for edge [{}]", edgeId.getId()); edgeEventsMigrationProcessed.put(edgeId, true); } } } + private void destroyKafkaSessionIfDisconnectedAndConsumerActive(TenantId tenantId, EdgeId edgeId, EdgeGrpcSession session) { + try { + if (session instanceof KafkaEdgeGrpcSession kafkaSession) { + if (!kafkaSession.isConnected() + && kafkaSession.getConsumer() != null + && kafkaSession.getConsumer().getConsumer() != null + && !kafkaSession.getConsumer().getConsumer().isStopped()) { + sessions.remove(edgeId); + kafkaSession.destroy(); + } + } + } catch (Exception e) { + log.warn("[{}] Failed to destroy kafka session for edge [{}]", tenantId, edgeId, e); + } + } + private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) { log.trace("[{}] cancelling edge event check for edge", edgeId); if (sessionEdgeEventChecks.containsKey(edgeId)) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 4a9b68fc6d..e4b78a5ad8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -42,7 +42,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; @@ -292,11 +291,11 @@ public abstract class EdgeGrpcSession implements Closeable { protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) { try { - log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); + log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink); processHighPriorityEvents(); PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); if (isConnected() && !pageData.getData().isEmpty()) { - log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, sessionId, pageData.getData().size()); + log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { @Override @@ -323,16 +322,16 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onFailure(Throwable t) { - log.error("[{}] Failed to send downlink msgs pack", sessionId, t); + log.error("[{}] Failed to send downlink msgs pack", edge.getId(), t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); + log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink); result.set(null); } } catch (Exception e) { - log.error("[{}] Failed to fetch edge events", sessionId, e); + log.error("[{}] Failed to fetch edge events", edge.getId(), e); result.setException(e); } } @@ -459,9 +458,9 @@ public abstract class EdgeGrpcSession implements Closeable { ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); } - log.warn("[{}][{}] {}, attempt: {}", tenantId, sessionId, failureMsg, attempt); + log.warn("[{}][{}] {}, attempt: {}", tenantId, edge.getId(), failureMsg, attempt); } - log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, sessionId, copy.size()); + log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, edge.getId(), copy.size()); for (DownlinkMsg downlinkMsg : copy) { if (clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > clientMaxInboundMessageSize) { String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + @@ -483,7 +482,7 @@ public abstract class EdgeGrpcSession implements Closeable { } else { String failureMsg = String.format("Failed to deliver messages: %s", copy); log.warn("[{}][{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", - tenantId, sessionId, MAX_DOWNLINK_ATTEMPTS, copy); + tenantId, edge.getId(), MAX_DOWNLINK_ATTEMPTS, copy); ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg) .error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build()); @@ -493,7 +492,7 @@ public abstract class EdgeGrpcSession implements Closeable { stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { - log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, sessionId, e.getMessage(), e); + log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, edge.getId(), e.getMessage(), e); stopCurrentSendDownlinkMsgsTask(true); } }; @@ -540,7 +539,7 @@ public abstract class EdgeGrpcSession implements Closeable { stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { - log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, sessionId, msg, e); + log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, edge.getId(), msg, e); } } @@ -555,12 +554,12 @@ public abstract class EdgeGrpcSession implements Closeable { while ((event = highPriorityQueue.poll()) != null) { highPriorityEvents.add(event); } - log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size()); + log.trace("[{}][{}] Sending high priority events {}", tenantId, edge.getId(), highPriorityEvents.size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents); sendDownlinkMsgsPack(downlinkMsgsPack).get(); } } catch (Exception e) { - log.error("[{}] Failed to process high priority events", sessionId, e); + log.error("[{}] Failed to process high priority events", edge.getId(), e); } } @@ -577,7 +576,7 @@ public abstract class EdgeGrpcSession implements Closeable { Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), ctx.getEdgeEventService()); log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", - tenantId, sessionId, previousStartTs, previousStartSeqId); + tenantId, edge.getId(), previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { @Override public void onSuccess(@Nullable Pair newStartTsAndSeqId) { @@ -586,7 +585,7 @@ public abstract class EdgeGrpcSession implements Closeable { Futures.addCallback(updateFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable List list) { - log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); + log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId); boolean newEventsAvailable; if (fetcher.isSeqIdNewCycleStarted()) { newEventsAvailable = isNewEdgeEventsAvailable(); @@ -601,28 +600,28 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t); + log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, edge.getId(), newStartTsAndSeqId, t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId); + log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, edge.getId()); result.set(Boolean.FALSE); } } @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to process events", tenantId, sessionId, t); + log.error("[{}][{}] Failed to process events", tenantId, edge.getId(), t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { if (isSyncInProgress()) { - log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId); + log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, edge.getId()); result.set(Boolean.TRUE); } else { - log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, sessionId); + log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, edge.getId()); result.set(null); } } @@ -632,7 +631,7 @@ public abstract class EdgeGrpcSession implements Closeable { protected List convertToDownlinkMsgsPack(List edgeEvents) { List result = new ArrayList<>(); for (EdgeEvent edgeEvent : edgeEvents) { - log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, sessionId, edgeEvent); + log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, edge.getId(), edgeEvent); DownlinkMsg downlinkMsg = null; try { switch (edgeEvent.getAction()) { @@ -641,17 +640,17 @@ public abstract class EdgeGrpcSession implements Closeable { ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { downlinkMsg = convertEntityEventToDownlink(edgeEvent); if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) { - log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId()); + log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, edge.getId(), downlinkMsg.getDownlinkMsgId()); } else { - log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg); + log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg); } } case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); - default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); + default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction()); } } catch (Exception e) { - log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); + log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, edge.getId(), e); } if (downlinkMsg != null) { result.add(downlinkMsg); @@ -757,19 +756,19 @@ public abstract class EdgeGrpcSession implements Closeable { private void sendDownlinkMsg(ResponseMsg responseMsg) { if (isConnected()) { String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000); - log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr); + log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, edge.getId(), responseMsgStr); downlinkMsgLock.lock(); String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr; try { outputStream.onNext(responseMsg); } catch (Exception e) { - log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); + log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, edge.getId(), downlinkMsgStr, e); connected = false; sessionCloseListener.accept(edge, sessionId); } finally { downlinkMsgLock.unlock(); } - log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr); + log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, edge.getId(), downlinkMsgStr); } } @@ -909,8 +908,8 @@ public abstract class EdgeGrpcSession implements Closeable { } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); - log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); - ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) + log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e); + ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); return Futures.immediateFailedFuture(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 37687f14a9..daffe9db11 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.Edge; @@ -56,6 +57,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private volatile boolean isHighPriorityProcessing; + @Getter private QueueConsumerManager> consumer; private ExecutorService consumerExecutor; @@ -72,31 +74,28 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { - log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); - if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) { - List edgeEvents = new ArrayList<>(); - for (TbProtoQueueMsg msg : msgs) { - EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); - edgeEvents.add(edgeEvent); + log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId()); + if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) { + log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " + + "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", + tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing); + return; + } + List edgeEvents = new ArrayList<>(); + for (TbProtoQueueMsg msg : msgs) { + EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); + edgeEvents.add(edgeEvent); + } + List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); + try { + boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); + if (isInterrupted) { + log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId()); + } else { + consumer.commit(); } - List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); - try { - boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); - if (isInterrupted) { - log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); - } else { - consumer.commit(); - } - } catch (Exception e) { - log.error("[{}] Failed to process all downlink messages", sessionId, e); - } - } else { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); - } catch (InterruptedException interruptedException) { - log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException); - } - log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); + } catch (Exception e) { + log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e); } } @@ -107,18 +106,23 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public ListenableFuture processEdgeEvents() { - if (consumer == null) { - this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); - this.consumer = QueueConsumerManager.>builder() - .name("TB Edge events") - .msgPackProcessor(this::processMsgs) - .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) - .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) - .consumerExecutor(consumerExecutor) - .threadPrefix("edge-events") - .build(); - consumer.subscribe(); - consumer.launch(); + if (consumer == null || (consumer.getConsumer() != null && consumer.getConsumer().isStopped())) { + try { + this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); + this.consumer = QueueConsumerManager.>builder() + .name("TB Edge events [" + edge.getId() + "]") + .msgPackProcessor(this::processMsgs) + .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) + .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) + .consumerExecutor(consumerExecutor) + .threadPrefix("edge-events-" + edge.getId()) + .build(); + consumer.subscribe(); + consumer.launch(); + } catch (Exception e) { + destroy(); + log.warn("[{}][{}] Failed to start edge event consumer", sessionId, edge.getId(), e); + } } return Futures.immediateFuture(Boolean.FALSE); } @@ -132,8 +136,18 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public void destroy() { - consumer.stop(); - consumerExecutor.shutdown(); + try { + if (consumer != null) { + consumer.stop(); + } + } finally { + consumer = null; + } + try { + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + } + } catch (Exception ignored) {} } @Override