Fixed case when seqId started new cycle. Improved edge trace/debug logging (#12342)

* Fixed case when seqId started new cycle. Improved edge trace/debug logging

* Edge downlink logging improved

* Edge downlink logging improved #2
This commit is contained in:
Volodymyr Babak 2024-12-30 10:04:41 +02:00 committed by GitHub
parent 15d22e9e6b
commit cf9204416b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 79 additions and 62 deletions

View File

@ -349,7 +349,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId, String requestServiceId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null) {
if (!session.isSyncCompleted()) {
if (session.isSyncInProgress()) {
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, false, "Sync process is active at the moment"), requestServiceId);
} else {
boolean success = false;
@ -368,7 +368,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
UUID requestId = request.getId();
EdgeGrpcSession session = sessions.get(request.getEdgeId());
if (session != null && !session.isSyncCompleted()) {
if (session != null && session.isSyncInProgress()) {
responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment"));
} else {
log.trace("[{}][{}] Processing sync edge request [{}], serviceId [{}]", request.getTenantId(), request.getId(), request.getEdgeId(), request.getServiceId());
@ -411,7 +411,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
newEventLock.lock();
try {
if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) {
log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId());
log.trace("[{}][{}] set session new events flag to false", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, false);
session.processHighPriorityEvents();
processEdgeEventMigrationIfNeeded(session, edgeId);
@ -420,6 +420,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
@Override
public void onSuccess(Boolean newEventsAdded) {
if (Boolean.TRUE.equals(newEventsAdded)) {
log.trace("[{}][{}] new events added. set session new events flag to true", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, true);
}
scheduleEdgeEventsCheck(session);

View File

@ -28,6 +28,7 @@ import org.springframework.data.util.Pair;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
@ -130,13 +131,12 @@ public abstract class EdgeGrpcSession implements Closeable {
private Long previousStartTs;
private Long newStartSeqId;
private Long previousStartSeqId;
private Long seqIdEnd;
private StreamObserver<RequestMsg> inputStream;
private StreamObserver<ResponseMsg> outputStream;
private volatile boolean connected;
private volatile boolean syncCompleted;
private volatile boolean syncInProgress;
private EdgeVersion edgeVersion;
private int maxInboundMessageSize;
@ -191,7 +191,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
startSyncProcess(fullSync);
} else {
syncCompleted = true;
syncInProgress = false;
}
}
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) {
@ -246,10 +246,14 @@ public abstract class EdgeGrpcSession implements Closeable {
}
public void startSyncProcess(boolean fullSync) {
log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId);
syncCompleted = false;
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
if (!syncInProgress) {
log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId);
syncInProgress = true;
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
} else {
log.info("[{}][{}][{}] Sync is already started, skipping starting it now", tenantId, edge.getId(), sessionId);
}
}
private void doSync(EdgeSyncCursor cursor) {
@ -292,6 +296,7 @@ public abstract class EdgeGrpcSession implements Closeable {
protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
try {
log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink);
processHighPriorityEvents();
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
@ -327,7 +332,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}] no event(s) found. Stop processing edge events", sessionId);
log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink);
result.set(null);
}
} catch (Exception e) {
@ -406,6 +411,8 @@ public abstract class EdgeGrpcSession implements Closeable {
|| sessionState.getScheduledSendDownlinkTask() != null && !sessionState.getScheduledSendDownlinkTask().isCancelled()) {
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", tenantId, edge.getId(), sessionId);
stopCurrentSendDownlinkMsgsTask(true);
} else {
log.trace("[{}][{}][{}] Previous send downlink future is not active", tenantId, edge.getId(), sessionId);
}
}
@ -519,12 +526,12 @@ public abstract class EdgeGrpcSession implements Closeable {
try {
if (msg.getSuccess()) {
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
log.debug("[{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
} else {
log.error("[{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg());
log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
}
if (sessionState.getPendingMsgsMap().isEmpty()) {
log.debug("[{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getRoutingKey());
log.debug("[{}][{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getId(), sessionId);
stopCurrentSendDownlinkMsgsTask(false);
}
} catch (Exception e) {
@ -534,7 +541,7 @@ public abstract class EdgeGrpcSession implements Closeable {
public void processHighPriorityEvents() {
try {
if (isConnected() && isSyncCompleted()) {
if (isConnected() && !isSyncInProgress()) {
if (highPriorityQueue.isEmpty()) {
return;
}
@ -543,6 +550,7 @@ public abstract class EdgeGrpcSession implements Closeable {
while ((event = highPriorityQueue.poll()) != null) {
highPriorityEvents.add(event);
}
log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents);
sendDownlinkMsgsPack(downlinkMsgsPack).get();
}
@ -553,18 +561,18 @@ public abstract class EdgeGrpcSession implements Closeable {
public ListenableFuture<Boolean> processEdgeEvents() throws Exception {
SettableFuture<Boolean> result = SettableFuture.create();
log.trace("[{}][{}] starting processing edge events", tenantId, sessionId);
if (isConnected() && isSyncCompleted()) {
if (isConnected() && !isSyncInProgress()) {
Pair<Long, Long> startTsAndSeqId = getQueueStartTsAndSeqId().get();
previousStartTs = startTsAndSeqId.getFirst();
previousStartSeqId = startTsAndSeqId.getSecond();
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
previousStartTs,
previousStartSeqId,
seqIdEnd,
false,
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService());
log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
tenantId, sessionId, previousStartTs, previousStartSeqId);
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
@ -574,18 +582,16 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override
public void onSuccess(@Nullable List<Long> list) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
boolean newEventsAvailable;
if (fetcher.isSeqIdNewCycleStarted()) {
seqIdEnd = fetcher.getSeqIdEnd();
boolean newEventsAvailable = isNewEdgeEventsAvailable();
result.set(newEventsAvailable);
newEventsAvailable = isNewEdgeEventsAvailable();
} else {
seqIdEnd = null;
boolean newEventsAvailable = isSeqIdStartedNewCycle();
newEventsAvailable = isSeqIdStartedNewCycle();
if (!newEventsAvailable) {
newEventsAvailable = isNewEdgeEventsAvailable();
}
result.set(newEventsAvailable);
}
result.set(newEventsAvailable);
}
@Override
@ -607,7 +613,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
}, ctx.getGrpcCallbackExecutorService());
} else {
if (!isSyncCompleted()) {
if (isSyncInProgress()) {
log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId);
result.set(Boolean.TRUE);
} else {
@ -672,9 +678,12 @@ public abstract class EdgeGrpcSession implements Closeable {
private boolean isSeqIdStartedNewCycle() {
try {
log.trace("[{}][{}][{}] Checking if seq id started new cycle", tenantId, edge.getId(), sessionId);
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, previousStartSeqId == 0 ? null : previousStartSeqId - 1, pageLink);
return !edgeEvents.getData().isEmpty();
boolean result = !edgeEvents.getData().isEmpty();
log.trace("[{}][{}][{}] Result of check if seq id started new cycle, result = {}", tenantId, edge.getId(), sessionId, result);
return result;
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", tenantId, edge.getId(), sessionId, e);
}
@ -683,9 +692,12 @@ public abstract class EdgeGrpcSession implements Closeable {
private boolean isNewEdgeEventsAvailable() {
try {
log.trace("[{}][{}][{}] Checking if new edge events available", tenantId, edge.getId(), sessionId);
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), newStartSeqId, null, pageLink);
return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty();
boolean result = !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty();
log.trace("[{}][{}][{}] Result of check if new edge events available, result = {}", tenantId, edge.getId(), sessionId, result);
return result;
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", tenantId, edge.getId(), sessionId, e);
}
@ -724,7 +736,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
private void markSyncCompletedSendEdgeEventUpdate() {
syncCompleted = true;
syncInProgress = false;
ctx.getClusterService().onEdgeEventUpdate(new EdgeEventUpdateMsg(edge.getTenantId(), edge.getId()));
}
@ -737,29 +749,27 @@ public abstract class EdgeGrpcSession implements Closeable {
}
}
private void sendDownlinkMsg(ResponseMsg downlinkMsg) {
if (downlinkMsg.getDownlinkMsg().getWidgetTypeUpdateMsgCount() > 0) {
log.trace("[{}][{}] Sending downlink widgetTypeUpdateMsg, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId());
} else {
log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, downlinkMsg);
}
private void sendDownlinkMsg(ResponseMsg responseMsg) {
if (isConnected()) {
String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000);
log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr);
downlinkMsgLock.lock();
String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr;
try {
outputStream.onNext(downlinkMsg);
outputStream.onNext(responseMsg);
} catch (Exception e) {
log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsg, e);
log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e);
connected = false;
sessionCloseListener.accept(edge, sessionId);
} finally {
downlinkMsgLock.unlock();
}
log.trace("[{}][{}] Response msg successfully sent. downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId());
log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr);
}
}
protected DownlinkMsg convertEntityEventToDownlink(EdgeEvent edgeEvent) {
log.trace("[{}] Executing convertEntityEventToDownlink, edgeEvent [{}], action [{}]", edgeEvent.getTenantId(), edgeEvent, edgeEvent.getAction());
private DownlinkMsg convertEntityEventToDownlink(EdgeEvent edgeEvent) {
log.trace("[{}][{}] Executing convertEntityEventToDownlink, edgeEvent [{}], action [{}]", tenantId, sessionId, edgeEvent, edgeEvent.getAction());
return switch (edgeEvent.getType()) {
case EDGE -> ctx.getEdgeProcessor().convertEdgeEventToDownlink(edgeEvent);
case DEVICE -> ctx.getDeviceProcessor().convertDeviceEventToDownlink(edgeEvent, edgeVersion);
@ -789,7 +799,7 @@ public abstract class EdgeGrpcSession implements Closeable {
case OAUTH2_CLIENT -> ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion);
case DOMAIN -> ctx.getOAuth2EdgeProcessor().convertOAuth2DomainEventToDownlink(edgeEvent, edgeVersion);
default -> {
log.warn("[{}] Unsupported edge event type [{}]", edgeEvent.getTenantId(), edgeEvent);
log.warn("[{}][{}] Unsupported edge event type [{}]", tenantId, sessionId, edgeEvent);
yield null;
}
};
@ -923,8 +933,8 @@ public abstract class EdgeGrpcSession implements Closeable {
}
} catch (Exception e) {
String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
log.error("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e);
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId())
log.error("[{}][{}] Can't process uplink msg [{}]", tenantId, sessionId, uplinkMsg, e);
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build());
return Futures.immediateFailedFuture(e);
}

View File

@ -73,7 +73,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
log.trace("[{}][{}] starting processing edge events", tenantId, sessionId);
if (isConnected() && isSyncCompleted() && !isHighPriorityProcessing) {
if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) {
List<EdgeEvent> edgeEvents = new ArrayList<>();
for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) {
EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg());

View File

@ -33,8 +33,6 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
private final Long queueStartTs;
private Long seqIdStart;
@Getter
private Long seqIdEnd;
@Getter
private boolean seqIdNewCycleStarted;
private Long maxReadRecordsCount;
private final EdgeEventService edgeEventService;
@ -53,10 +51,11 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) {
try {
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, seqIdEnd, (TimePageLink) pageLink);
log.trace("[{}] Finding general edge events [{}], seqIdStart = {}, pageLink = {}",
tenantId, edge.getId(), seqIdStart, pageLink);
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, null, (TimePageLink) pageLink);
if (edgeEvents.getData().isEmpty()) {
this.seqIdEnd = Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount);
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, seqIdEnd, (TimePageLink) pageLink);
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount), (TimePageLink) pageLink);
if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) {
log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId());
this.seqIdNewCycleStarted = true;

View File

@ -235,6 +235,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
if (relationsList != null && !relationsList.isEmpty()) {
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (List<EntityRelation> entityRelations : relationsList) {
if (entityRelations.isEmpty()) {
continue;
}
log.trace("[{}][{}][{}][{}] relation(s) are going to be pushed to edge.", tenantId, edge.getId(), entityId, entityRelations.size());
for (EntityRelation relation : entityRelations) {
try {
@ -255,19 +258,23 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
}
}
}
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> voids) {
futureToSet.set(null);
}
if (futures.isEmpty()) {
futureToSet.set(null);
} else {
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> voids) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable throwable) {
String errMsg = String.format("[%s][%s] Exception during saving edge events [%s]!", tenantId, edge.getId(), relationRequestMsg);
log.error(errMsg, throwable);
futureToSet.setException(new RuntimeException(errMsg, throwable));
}
}, dbCallbackExecutorService);
@Override
public void onFailure(Throwable throwable) {
String errMsg = String.format("[%s][%s] Exception during saving edge events [%s]!", tenantId, edge.getId(), relationRequestMsg);
log.error(errMsg, throwable);
futureToSet.setException(new RuntimeException(errMsg, throwable));
}
}, dbCallbackExecutorService);
}
} else {
futureToSet.set(null);
}

View File

@ -134,7 +134,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao<EdgeEventEnti
@Override
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
log.debug("Save edge event [{}] ", edgeEvent);
log.debug("Saving EdgeEvent [{}] ", edgeEvent);
if (edgeEvent.getId() == null) {
UUID timeBased = Uuids.timeBased();
edgeEvent.setId(new EdgeEventId(timeBased));
@ -156,7 +156,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao<EdgeEventEnti
}
private ListenableFuture<Void> save(EdgeEventEntity entity) {
log.debug("Save edge event [{}] ", entity);
log.debug("Saving EdgeEventEntity [{}] ", entity);
if (entity.getTenantId() == null) {
log.trace("Save system edge event with predefined id {}", systemTenantId);
entity.setTenantId(systemTenantId);