KafkaEdgeGrpcSession - improvements for stability during rollout restart of force restart of tb-core services

This commit is contained in:
Volodymyr Babak 2025-05-30 16:36:58 +03:00
parent 712117fc26
commit 32e9efec83
3 changed files with 102 additions and 68 deletions

View File

@ -405,6 +405,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeId edgeId = session.getEdge().getId(); EdgeId edgeId = session.getEdge().getId();
TenantId tenantId = session.getEdge().getTenantId(); TenantId tenantId = session.getEdge().getTenantId();
destroyKafkaSessionIfDisconnectedAndConsumerActive(tenantId, edgeId, session);
cancelScheduleEdgeEventsCheck(edgeId); cancelScheduleEdgeEventsCheck(edgeId);
if (sessions.containsKey(edgeId)) { if (sessions.containsKey(edgeId)) {
@ -459,16 +461,35 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception { private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception {
boolean isMigrationProcessed = edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE); boolean isMigrationProcessed = edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE);
if (!isMigrationProcessed) { if (!isMigrationProcessed) {
log.info("Starting edge event migration for edge [{}]", edgeId.getId());
Boolean eventsExist = session.migrateEdgeEvents().get(); Boolean eventsExist = session.migrateEdgeEvents().get();
if (Boolean.TRUE.equals(eventsExist)) { if (Boolean.TRUE.equals(eventsExist)) {
log.info("Migration still in progress for edge [{}]", edgeId.getId());
sessionNewEvents.put(edgeId, true); sessionNewEvents.put(edgeId, true);
scheduleEdgeEventsCheck(session); scheduleEdgeEventsCheck(session);
} else if (Boolean.FALSE.equals(eventsExist)) { } else if (Boolean.FALSE.equals(eventsExist)) {
log.info("Migration completed for edge [{}]", edgeId.getId());
edgeEventsMigrationProcessed.put(edgeId, true); edgeEventsMigrationProcessed.put(edgeId, true);
} }
} }
} }
private void destroyKafkaSessionIfDisconnectedAndConsumerActive(TenantId tenantId, EdgeId edgeId, EdgeGrpcSession session) {
try {
if (session instanceof KafkaEdgeGrpcSession kafkaSession) {
if (!kafkaSession.isConnected()
&& kafkaSession.getConsumer() != null
&& kafkaSession.getConsumer().getConsumer() != null
&& !kafkaSession.getConsumer().getConsumer().isStopped()) {
sessions.remove(edgeId);
kafkaSession.destroy();
}
}
} catch (Exception e) {
log.warn("[{}] Failed to destroy kafka session for edge [{}]", tenantId, edgeId, e);
}
}
private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) { private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) {
log.trace("[{}] cancelling edge event check for edge", edgeId); log.trace("[{}] cancelling edge event check for edge", edgeId);
if (sessionEdgeEventChecks.containsKey(edgeId)) { if (sessionEdgeEventChecks.containsKey(edgeId)) {

View File

@ -42,7 +42,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
@ -292,11 +291,11 @@ public abstract class EdgeGrpcSession implements Closeable {
protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) { protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
try { try {
log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink);
processHighPriorityEvents(); processHighPriorityEvents();
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
if (isConnected() && !pageData.getData().isEmpty()) { if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, sessionId, pageData.getData().size()); log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
@Override @Override
@ -323,16 +322,16 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.error("[{}] Failed to send downlink msgs pack", sessionId, t); log.error("[{}] Failed to send downlink msgs pack", edge.getId(), t);
result.setException(t); result.setException(t);
} }
}, ctx.getGrpcCallbackExecutorService()); }, ctx.getGrpcCallbackExecutorService());
} else { } else {
log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink);
result.set(null); result.set(null);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to fetch edge events", sessionId, e); log.error("[{}] Failed to fetch edge events", edge.getId(), e);
result.setException(e); result.setException(e);
} }
} }
@ -459,9 +458,9 @@ public abstract class EdgeGrpcSession implements Closeable {
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
.edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build());
} }
log.warn("[{}][{}] {}, attempt: {}", tenantId, sessionId, failureMsg, attempt); log.warn("[{}][{}] {}, attempt: {}", tenantId, edge.getId(), failureMsg, attempt);
} }
log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, sessionId, copy.size()); log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, edge.getId(), copy.size());
for (DownlinkMsg downlinkMsg : copy) { for (DownlinkMsg downlinkMsg : copy) {
if (clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > clientMaxInboundMessageSize) { if (clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > clientMaxInboundMessageSize) {
String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " +
@ -483,7 +482,7 @@ public abstract class EdgeGrpcSession implements Closeable {
} else { } else {
String failureMsg = String.format("Failed to deliver messages: %s", copy); String failureMsg = String.format("Failed to deliver messages: %s", copy);
log.warn("[{}][{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", log.warn("[{}][{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
tenantId, sessionId, MAX_DOWNLINK_ATTEMPTS, copy); tenantId, edge.getId(), MAX_DOWNLINK_ATTEMPTS, copy);
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg)
.error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build()); .error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build());
@ -493,7 +492,7 @@ public abstract class EdgeGrpcSession implements Closeable {
stopCurrentSendDownlinkMsgsTask(false); stopCurrentSendDownlinkMsgsTask(false);
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, sessionId, e.getMessage(), e); log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, edge.getId(), e.getMessage(), e);
stopCurrentSendDownlinkMsgsTask(true); stopCurrentSendDownlinkMsgsTask(true);
} }
}; };
@ -540,7 +539,7 @@ public abstract class EdgeGrpcSession implements Closeable {
stopCurrentSendDownlinkMsgsTask(false); stopCurrentSendDownlinkMsgsTask(false);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, sessionId, msg, e); log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, edge.getId(), msg, e);
} }
} }
@ -555,12 +554,12 @@ public abstract class EdgeGrpcSession implements Closeable {
while ((event = highPriorityQueue.poll()) != null) { while ((event = highPriorityQueue.poll()) != null) {
highPriorityEvents.add(event); highPriorityEvents.add(event);
} }
log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size()); log.trace("[{}][{}] Sending high priority events {}", tenantId, edge.getId(), highPriorityEvents.size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents); List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents);
sendDownlinkMsgsPack(downlinkMsgsPack).get(); sendDownlinkMsgsPack(downlinkMsgsPack).get();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to process high priority events", sessionId, e); log.error("[{}] Failed to process high priority events", edge.getId(), e);
} }
} }
@ -577,7 +576,7 @@ public abstract class EdgeGrpcSession implements Closeable {
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService()); ctx.getEdgeEventService());
log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
tenantId, sessionId, previousStartTs, previousStartSeqId); tenantId, edge.getId(), previousStartTs, previousStartSeqId);
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) { public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
@ -586,7 +585,7 @@ public abstract class EdgeGrpcSession implements Closeable {
Futures.addCallback(updateFuture, new FutureCallback<>() { Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable List<Long> list) { public void onSuccess(@Nullable List<Long> list) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId);
boolean newEventsAvailable; boolean newEventsAvailable;
if (fetcher.isSeqIdNewCycleStarted()) { if (fetcher.isSeqIdNewCycleStarted()) {
newEventsAvailable = isNewEdgeEventsAvailable(); newEventsAvailable = isNewEdgeEventsAvailable();
@ -601,28 +600,28 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t); log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, edge.getId(), newStartTsAndSeqId, t);
result.setException(t); result.setException(t);
} }
}, ctx.getGrpcCallbackExecutorService()); }, ctx.getGrpcCallbackExecutorService());
} else { } else {
log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId); log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, edge.getId());
result.set(Boolean.FALSE); result.set(Boolean.FALSE);
} }
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to process events", tenantId, sessionId, t); log.error("[{}][{}] Failed to process events", tenantId, edge.getId(), t);
result.setException(t); result.setException(t);
} }
}, ctx.getGrpcCallbackExecutorService()); }, ctx.getGrpcCallbackExecutorService());
} else { } else {
if (isSyncInProgress()) { if (isSyncInProgress()) {
log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId); log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, edge.getId());
result.set(Boolean.TRUE); result.set(Boolean.TRUE);
} else { } else {
log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, sessionId); log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, edge.getId());
result.set(null); result.set(null);
} }
} }
@ -632,7 +631,7 @@ public abstract class EdgeGrpcSession implements Closeable {
protected List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) { protected List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
List<DownlinkMsg> result = new ArrayList<>(); List<DownlinkMsg> result = new ArrayList<>();
for (EdgeEvent edgeEvent : edgeEvents) { for (EdgeEvent edgeEvent : edgeEvents) {
log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, sessionId, edgeEvent); log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, edge.getId(), edgeEvent);
DownlinkMsg downlinkMsg = null; DownlinkMsg downlinkMsg = null;
try { try {
switch (edgeEvent.getAction()) { switch (edgeEvent.getAction()) {
@ -641,17 +640,17 @@ public abstract class EdgeGrpcSession implements Closeable {
ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
downlinkMsg = convertEntityEventToDownlink(edgeEvent); downlinkMsg = convertEntityEventToDownlink(edgeEvent);
if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) { if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) {
log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId()); log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, edge.getId(), downlinkMsg.getDownlinkMsgId());
} else { } else {
log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg); log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg);
} }
} }
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction());
} }
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, edge.getId(), e);
} }
if (downlinkMsg != null) { if (downlinkMsg != null) {
result.add(downlinkMsg); result.add(downlinkMsg);
@ -757,19 +756,19 @@ public abstract class EdgeGrpcSession implements Closeable {
private void sendDownlinkMsg(ResponseMsg responseMsg) { private void sendDownlinkMsg(ResponseMsg responseMsg) {
if (isConnected()) { if (isConnected()) {
String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000); String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000);
log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr); log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, edge.getId(), responseMsgStr);
downlinkMsgLock.lock(); downlinkMsgLock.lock();
String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr; String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr;
try { try {
outputStream.onNext(responseMsg); outputStream.onNext(responseMsg);
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, edge.getId(), downlinkMsgStr, e);
connected = false; connected = false;
sessionCloseListener.accept(edge, sessionId); sessionCloseListener.accept(edge, sessionId);
} finally { } finally {
downlinkMsgLock.unlock(); downlinkMsgLock.unlock();
} }
log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr); log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, edge.getId(), downlinkMsgStr);
} }
} }
@ -909,8 +908,8 @@ public abstract class EdgeGrpcSession implements Closeable {
} }
} catch (Exception e) { } catch (Exception e) {
String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e);
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build());
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
@ -56,6 +57,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private volatile boolean isHighPriorityProcessing; private volatile boolean isHighPriorityProcessing;
@Getter
private QueueConsumerManager<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer; private QueueConsumerManager<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer;
private ExecutorService consumerExecutor; private ExecutorService consumerExecutor;
@ -72,8 +74,13 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
} }
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) { private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId());
if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) { if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) {
log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " +
"connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing);
return;
}
List<EdgeEvent> edgeEvents = new ArrayList<>(); List<EdgeEvent> edgeEvents = new ArrayList<>();
for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) { for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) {
EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg());
@ -83,20 +90,12 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
try { try {
boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
if (isInterrupted) { if (isInterrupted) {
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId());
} else { } else {
consumer.commit(); consumer.commit();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to process all downlink messages", sessionId, e); log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e);
}
} else {
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
} catch (InterruptedException interruptedException) {
log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException);
}
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
} }
} }
@ -107,18 +106,23 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
@Override @Override
public ListenableFuture<Boolean> processEdgeEvents() { public ListenableFuture<Boolean> processEdgeEvents() {
if (consumer == null) { if (consumer == null || (consumer.getConsumer() != null && consumer.getConsumer().isStopped())) {
try {
this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer"));
this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder() this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder()
.name("TB Edge events") .name("TB Edge events [" + edge.getId() + "]")
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
.pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval())
.consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId()))
.consumerExecutor(consumerExecutor) .consumerExecutor(consumerExecutor)
.threadPrefix("edge-events") .threadPrefix("edge-events-" + edge.getId())
.build(); .build();
consumer.subscribe(); consumer.subscribe();
consumer.launch(); consumer.launch();
} catch (Exception e) {
destroy();
log.warn("[{}][{}] Failed to start edge event consumer", sessionId, edge.getId(), e);
}
} }
return Futures.immediateFuture(Boolean.FALSE); return Futures.immediateFuture(Boolean.FALSE);
} }
@ -132,9 +136,19 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
@Override @Override
public void destroy() { public void destroy() {
try {
if (consumer != null) {
consumer.stop(); consumer.stop();
}
} finally {
consumer = null;
}
try {
if (consumerExecutor != null) {
consumerExecutor.shutdown(); consumerExecutor.shutdown();
} }
} catch (Exception ignored) {}
}
@Override @Override
public void cleanUp() { public void cleanUp() {