diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 2e9465fc9d..73a504245d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -349,7 +349,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId, String requestServiceId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null) { - if (!session.isSyncCompleted()) { + if (session.isSyncInProgress()) { clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, false, "Sync process is active at the moment"), requestServiceId); } else { boolean success = false; @@ -368,7 +368,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i UUID requestId = request.getId(); EdgeGrpcSession session = sessions.get(request.getEdgeId()); - if (session != null && !session.isSyncCompleted()) { + if (session != null && session.isSyncInProgress()) { responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment")); } else { log.trace("[{}][{}] Processing sync edge request [{}], serviceId [{}]", request.getTenantId(), request.getId(), request.getEdgeId(), request.getServiceId()); @@ -411,7 +411,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i newEventLock.lock(); try { if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) { - log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId()); + log.trace("[{}][{}] set session new events flag to false", tenantId, edgeId.getId()); sessionNewEvents.put(edgeId, false); session.processHighPriorityEvents(); processEdgeEventMigrationIfNeeded(session, edgeId); @@ -420,6 +420,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Override public void onSuccess(Boolean newEventsAdded) { if (Boolean.TRUE.equals(newEventsAdded)) { + log.trace("[{}][{}] new events added. set session new events flag to true", tenantId, edgeId.getId()); sessionNewEvents.put(edgeId, true); } scheduleEdgeEventsCheck(session); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 64c97d735d..137c4d5bf5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -28,6 +28,7 @@ import org.springframework.data.util.Pair; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; @@ -130,13 +131,12 @@ public abstract class EdgeGrpcSession implements Closeable { private Long previousStartTs; private Long newStartSeqId; private Long previousStartSeqId; - private Long seqIdEnd; private StreamObserver inputStream; private StreamObserver outputStream; private volatile boolean connected; - private volatile boolean syncCompleted; + private volatile boolean syncInProgress; private EdgeVersion edgeVersion; private int maxInboundMessageSize; @@ -191,7 +191,7 @@ public abstract class EdgeGrpcSession implements Closeable { } startSyncProcess(fullSync); } else { - syncCompleted = true; + syncInProgress = false; } } if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) { @@ -246,10 +246,14 @@ public abstract class EdgeGrpcSession implements Closeable { } public void startSyncProcess(boolean fullSync) { - log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId); - syncCompleted = false; - interruptGeneralProcessingOnSync(); - doSync(new EdgeSyncCursor(ctx, edge, fullSync)); + if (!syncInProgress) { + log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId); + syncInProgress = true; + interruptGeneralProcessingOnSync(); + doSync(new EdgeSyncCursor(ctx, edge, fullSync)); + } else { + log.info("[{}][{}][{}] Sync is already started, skipping starting it now", tenantId, edge.getId(), sessionId); + } } private void doSync(EdgeSyncCursor cursor) { @@ -292,6 +296,7 @@ public abstract class EdgeGrpcSession implements Closeable { protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) { try { + log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); processHighPriorityEvents(); PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); if (isConnected() && !pageData.getData().isEmpty()) { @@ -327,7 +332,7 @@ public abstract class EdgeGrpcSession implements Closeable { } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}] no event(s) found. Stop processing edge events", sessionId); + log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); result.set(null); } } catch (Exception e) { @@ -406,6 +411,8 @@ public abstract class EdgeGrpcSession implements Closeable { || sessionState.getScheduledSendDownlinkTask() != null && !sessionState.getScheduledSendDownlinkTask().isCancelled()) { log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", tenantId, edge.getId(), sessionId); stopCurrentSendDownlinkMsgsTask(true); + } else { + log.trace("[{}][{}][{}] Previous send downlink future is not active", tenantId, edge.getId(), sessionId); } } @@ -519,12 +526,12 @@ public abstract class EdgeGrpcSession implements Closeable { try { if (msg.getSuccess()) { sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); - log.debug("[{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg); + log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg); } else { - log.error("[{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg()); + log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); } if (sessionState.getPendingMsgsMap().isEmpty()) { - log.debug("[{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getRoutingKey()); + log.debug("[{}][{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getId(), sessionId); stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { @@ -534,7 +541,7 @@ public abstract class EdgeGrpcSession implements Closeable { public void processHighPriorityEvents() { try { - if (isConnected() && isSyncCompleted()) { + if (isConnected() && !isSyncInProgress()) { if (highPriorityQueue.isEmpty()) { return; } @@ -543,6 +550,7 @@ public abstract class EdgeGrpcSession implements Closeable { while ((event = highPriorityQueue.poll()) != null) { highPriorityEvents.add(event); } + log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents); sendDownlinkMsgsPack(downlinkMsgsPack).get(); } @@ -553,18 +561,18 @@ public abstract class EdgeGrpcSession implements Closeable { public ListenableFuture processEdgeEvents() throws Exception { SettableFuture result = SettableFuture.create(); - log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); - if (isConnected() && isSyncCompleted()) { + if (isConnected() && !isSyncInProgress()) { Pair startTsAndSeqId = getQueueStartTsAndSeqId().get(); previousStartTs = startTsAndSeqId.getFirst(); previousStartSeqId = startTsAndSeqId.getSecond(); GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher( previousStartTs, previousStartSeqId, - seqIdEnd, false, Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), ctx.getEdgeEventService()); + log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", + tenantId, sessionId, previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { @Override public void onSuccess(@Nullable Pair newStartTsAndSeqId) { @@ -574,18 +582,16 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onSuccess(@Nullable List list) { log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); + boolean newEventsAvailable; if (fetcher.isSeqIdNewCycleStarted()) { - seqIdEnd = fetcher.getSeqIdEnd(); - boolean newEventsAvailable = isNewEdgeEventsAvailable(); - result.set(newEventsAvailable); + newEventsAvailable = isNewEdgeEventsAvailable(); } else { - seqIdEnd = null; - boolean newEventsAvailable = isSeqIdStartedNewCycle(); + newEventsAvailable = isSeqIdStartedNewCycle(); if (!newEventsAvailable) { newEventsAvailable = isNewEdgeEventsAvailable(); } - result.set(newEventsAvailable); } + result.set(newEventsAvailable); } @Override @@ -607,7 +613,7 @@ public abstract class EdgeGrpcSession implements Closeable { } }, ctx.getGrpcCallbackExecutorService()); } else { - if (!isSyncCompleted()) { + if (isSyncInProgress()) { log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId); result.set(Boolean.TRUE); } else { @@ -672,9 +678,12 @@ public abstract class EdgeGrpcSession implements Closeable { private boolean isSeqIdStartedNewCycle() { try { + log.trace("[{}][{}][{}] Checking if seq id started new cycle", tenantId, edge.getId(), sessionId); TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, newStartTs, System.currentTimeMillis()); PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, previousStartSeqId == 0 ? null : previousStartSeqId - 1, pageLink); - return !edgeEvents.getData().isEmpty(); + boolean result = !edgeEvents.getData().isEmpty(); + log.trace("[{}][{}][{}] Result of check if seq id started new cycle, result = {}", tenantId, edge.getId(), sessionId, result); + return result; } catch (Exception e) { log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", tenantId, edge.getId(), sessionId, e); } @@ -683,9 +692,12 @@ public abstract class EdgeGrpcSession implements Closeable { private boolean isNewEdgeEventsAvailable() { try { + log.trace("[{}][{}][{}] Checking if new edge events available", tenantId, edge.getId(), sessionId); TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, newStartTs, System.currentTimeMillis()); PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), newStartSeqId, null, pageLink); - return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty(); + boolean result = !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty(); + log.trace("[{}][{}][{}] Result of check if new edge events available, result = {}", tenantId, edge.getId(), sessionId, result); + return result; } catch (Exception e) { log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", tenantId, edge.getId(), sessionId, e); } @@ -724,7 +736,7 @@ public abstract class EdgeGrpcSession implements Closeable { } private void markSyncCompletedSendEdgeEventUpdate() { - syncCompleted = true; + syncInProgress = false; ctx.getClusterService().onEdgeEventUpdate(new EdgeEventUpdateMsg(edge.getTenantId(), edge.getId())); } @@ -737,29 +749,27 @@ public abstract class EdgeGrpcSession implements Closeable { } } - private void sendDownlinkMsg(ResponseMsg downlinkMsg) { - if (downlinkMsg.getDownlinkMsg().getWidgetTypeUpdateMsgCount() > 0) { - log.trace("[{}][{}] Sending downlink widgetTypeUpdateMsg, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId()); - } else { - log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, downlinkMsg); - } + private void sendDownlinkMsg(ResponseMsg responseMsg) { if (isConnected()) { + String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000); + log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr); downlinkMsgLock.lock(); + String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr; try { - outputStream.onNext(downlinkMsg); + outputStream.onNext(responseMsg); } catch (Exception e) { - log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsg, e); + log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); connected = false; sessionCloseListener.accept(edge, sessionId); } finally { downlinkMsgLock.unlock(); } - log.trace("[{}][{}] Response msg successfully sent. downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId()); + log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr); } } - protected DownlinkMsg convertEntityEventToDownlink(EdgeEvent edgeEvent) { - log.trace("[{}] Executing convertEntityEventToDownlink, edgeEvent [{}], action [{}]", edgeEvent.getTenantId(), edgeEvent, edgeEvent.getAction()); + private DownlinkMsg convertEntityEventToDownlink(EdgeEvent edgeEvent) { + log.trace("[{}][{}] Executing convertEntityEventToDownlink, edgeEvent [{}], action [{}]", tenantId, sessionId, edgeEvent, edgeEvent.getAction()); return switch (edgeEvent.getType()) { case EDGE -> ctx.getEdgeProcessor().convertEdgeEventToDownlink(edgeEvent); case DEVICE -> ctx.getDeviceProcessor().convertDeviceEventToDownlink(edgeEvent, edgeVersion); @@ -789,7 +799,7 @@ public abstract class EdgeGrpcSession implements Closeable { case OAUTH2_CLIENT -> ctx.getOAuth2EdgeProcessor().convertOAuth2ClientEventToDownlink(edgeEvent, edgeVersion); case DOMAIN -> ctx.getOAuth2EdgeProcessor().convertOAuth2DomainEventToDownlink(edgeEvent, edgeVersion); default -> { - log.warn("[{}] Unsupported edge event type [{}]", edgeEvent.getTenantId(), edgeEvent); + log.warn("[{}][{}] Unsupported edge event type [{}]", tenantId, sessionId, edgeEvent); yield null; } }; @@ -923,8 +933,8 @@ public abstract class EdgeGrpcSession implements Closeable { } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); - log.error("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); - ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) + log.error("[{}][{}] Can't process uplink msg [{}]", tenantId, sessionId, uplinkMsg, e); + ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); return Futures.immediateFailedFuture(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 9cb3f2dffc..29864b4cfa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -73,7 +73,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private void processMsgs(List> msgs, TbQueueConsumer> consumer) { log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); - if (isConnected() && isSyncCompleted() && !isHighPriorityProcessing) { + if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) { List edgeEvents = new ArrayList<>(); for (TbProtoQueueMsg msg : msgs) { EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index 37bf860f60..b2a1044981 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -33,8 +33,6 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { private final Long queueStartTs; private Long seqIdStart; @Getter - private Long seqIdEnd; - @Getter private boolean seqIdNewCycleStarted; private Long maxReadRecordsCount; private final EdgeEventService edgeEventService; @@ -53,10 +51,11 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { @Override public PageData fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { try { - PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, seqIdEnd, (TimePageLink) pageLink); + log.trace("[{}] Finding general edge events [{}], seqIdStart = {}, pageLink = {}", + tenantId, edge.getId(), seqIdStart, pageLink); + PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, null, (TimePageLink) pageLink); if (edgeEvents.getData().isEmpty()) { - this.seqIdEnd = Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount); - edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, seqIdEnd, (TimePageLink) pageLink); + edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount), (TimePageLink) pageLink); if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) { log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId()); this.seqIdNewCycleStarted = true; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 46f1d7ef57..fb0a45992c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -235,6 +235,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (relationsList != null && !relationsList.isEmpty()) { List> futures = new ArrayList<>(); for (List entityRelations : relationsList) { + if (entityRelations.isEmpty()) { + continue; + } log.trace("[{}][{}][{}][{}] relation(s) are going to be pushed to edge.", tenantId, edge.getId(), entityId, entityRelations.size()); for (EntityRelation relation : entityRelations) { try { @@ -255,19 +258,23 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } } } - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List voids) { - futureToSet.set(null); - } + if (futures.isEmpty()) { + futureToSet.set(null); + } else { + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List voids) { + futureToSet.set(null); + } - @Override - public void onFailure(Throwable throwable) { - String errMsg = String.format("[%s][%s] Exception during saving edge events [%s]!", tenantId, edge.getId(), relationRequestMsg); - log.error(errMsg, throwable); - futureToSet.setException(new RuntimeException(errMsg, throwable)); - } - }, dbCallbackExecutorService); + @Override + public void onFailure(Throwable throwable) { + String errMsg = String.format("[%s][%s] Exception during saving edge events [%s]!", tenantId, edge.getId(), relationRequestMsg); + log.error(errMsg, throwable); + futureToSet.setException(new RuntimeException(errMsg, throwable)); + } + }, dbCallbackExecutorService); + } } else { futureToSet.set(null); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index 065a057160..c580661e67 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -134,7 +134,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao saveAsync(EdgeEvent edgeEvent) { - log.debug("Save edge event [{}] ", edgeEvent); + log.debug("Saving EdgeEvent [{}] ", edgeEvent); if (edgeEvent.getId() == null) { UUID timeBased = Uuids.timeBased(); edgeEvent.setId(new EdgeEventId(timeBased)); @@ -156,7 +156,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao save(EdgeEventEntity entity) { - log.debug("Save edge event [{}] ", entity); + log.debug("Saving EdgeEventEntity [{}] ", entity); if (entity.getTenantId() == null) { log.trace("Save system edge event with predefined id {}", systemTenantId); entity.setTenantId(systemTenantId);