From e97668b01ce457e21c7ff9d4b925ff84b8582eb5 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 1 Nov 2024 17:53:11 +0200 Subject: [PATCH] Add logic to process postgres edge event fetcher on start --- .../edge/rpc/AbstractEdgeGrpcSession.java | 254 ++++++++++++++---- .../service/edge/rpc/EdgeGrpcService.java | 28 +- .../edge/rpc/PostgresEdgeGrpcSession.java | 155 ----------- .../ttl/KafkaEdgeTopicsCleanUpService.java | 12 +- .../server/dao/edge/EdgeService.java | 3 +- .../thingsboard/server/dao/edge/EdgeDao.java | 2 + .../server/dao/edge/EdgeServiceImpl.java | 8 + .../server/dao/sql/edge/EdgeRepository.java | 28 +- .../server/dao/sql/edge/JpaEdgeDao.java | 9 + 9 files changed, 262 insertions(+), 237 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java index 475826fee7..9aa21dd265 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java @@ -34,11 +34,14 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.SortOrder; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; @@ -89,6 +92,7 @@ import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceProces import java.io.Closeable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -103,27 +107,36 @@ import java.util.function.BiConsumer; @Data public abstract class AbstractEdgeGrpcSession> implements EdgeGrpcSession, Closeable { + private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; + private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId"; + private static final int MAX_DOWNLINK_ATTEMPTS = 10; private static final String RATE_LIMIT_REACHED = "Rate limit reached"; protected static final ConcurrentLinkedQueue highPriorityQueue = new ConcurrentLinkedQueue<>(); - protected UUID sessionId; - protected BiConsumer sessionOpenListener; - protected BiConsumer sessionCloseListener; + private UUID sessionId; + private BiConsumer sessionOpenListener; + private BiConsumer sessionCloseListener; private final EdgeSessionState sessionState = new EdgeSessionState(); private final ReentrantLock downlinkMsgLock = new ReentrantLock(); - protected EdgeContextComponent ctx; - protected Edge edge; - protected TenantId tenantId; + private EdgeContextComponent ctx; + private Edge edge; + private TenantId tenantId; - protected StreamObserver inputStream; - protected StreamObserver outputStream; + private Long newStartTs; + private Long previousStartTs; + private Long newStartSeqId; + private Long previousStartSeqId; + private Long seqIdEnd; - protected boolean connected; - protected volatile boolean syncCompleted; + private StreamObserver inputStream; + private StreamObserver outputStream; + + private boolean connected; + private volatile boolean syncCompleted; private EdgeVersion edgeVersion; private int maxInboundMessageSize; @@ -231,6 +244,52 @@ public abstract class AbstractEdgeGrpcSession> future = startProcessingEdgeEvents(next); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Pair result) { + doSync(cursor); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), t); + } + }, ctx.getGrpcCallbackExecutorService()); + } else { + log.info("[{}][{}] sync process completed", this.tenantId, edge.getId()); + DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) + .build(); + Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() { + @Override + public void onSuccess(Boolean isInterrupted) { + markSyncCompletedSendEdgeEventUpdate(); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Exception during sending sync complete", tenantId, edge.getId(), t); + markSyncCompletedSendEdgeEventUpdate(); + } + }, ctx.getGrpcCallbackExecutorService()); + } + } + protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) { try { if (!highPriorityQueue.isEmpty()) { @@ -327,57 +386,11 @@ public abstract class AbstractEdgeGrpcSession> future = startProcessingEdgeEvents(next); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Pair result) { - doSync(cursor); - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), t); - } - }, ctx.getGrpcCallbackExecutorService()); - } else { - log.info("[{}][{}] sync process completed", this.tenantId, edge.getId()); - DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) - .build(); - Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() { - @Override - public void onSuccess(Boolean isInterrupted) { - markSyncCompletedSendEdgeEventUpdate(); - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Exception during sending sync complete", tenantId, edge.getId(), t); - markSyncCompletedSendEdgeEventUpdate(); - } - }, ctx.getGrpcCallbackExecutorService()); - } - } - protected ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { interruptPreviousSendDownlinkMsgsTask(); @@ -535,6 +548,68 @@ public abstract class AbstractEdgeGrpcSession processEdgeEvents() throws Exception { + SettableFuture result = SettableFuture.create(); + log.trace("[{}][{}] starting processing edge events", this.tenantId, this.sessionId); + if (isConnected() && isSyncCompleted()) { + Pair startTsAndSeqId = getQueueStartTsAndSeqId().get(); + this.previousStartTs = startTsAndSeqId.getFirst(); + this.previousStartSeqId = startTsAndSeqId.getSecond(); + GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher( + this.previousStartTs, + this.previousStartSeqId, + this.seqIdEnd, + false, + Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), + ctx.getEdgeEventService()); + Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Pair newStartTsAndSeqId) { + if (newStartTsAndSeqId != null) { + ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); + Futures.addCallback(updateFuture, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List list) { + log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); + if (fetcher.isSeqIdNewCycleStarted()) { + seqIdEnd = fetcher.getSeqIdEnd(); + boolean newEventsAvailable = isNewEdgeEventsAvailable(); + result.set(newEventsAvailable); + } else { + seqIdEnd = null; + boolean newEventsAvailable = isSeqIdStartedNewCycle(); + if (!newEventsAvailable) { + newEventsAvailable = isNewEdgeEventsAvailable(); + } + result.set(newEventsAvailable); + } + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t); + result.setException(t); + } + }, ctx.getGrpcCallbackExecutorService()); + } else { + log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId); + result.set(null); + } + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to process events", tenantId, sessionId, t); + result.setException(t); + } + }, ctx.getGrpcCallbackExecutorService()); + } else { + log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); + result.set(null); + } + return result; + } + protected List convertToDownlinkMsgsPack(List edgeEvents) { List result = new ArrayList<>(); for (EdgeEvent edgeEvent : edgeEvents) { @@ -569,6 +644,73 @@ public abstract class AbstractEdgeGrpcSession> getQueueStartTsAndSeqId() { + ListenableFuture> future = + ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY)); + return Futures.transform(future, attributeKvEntries -> { + long startTs = 0L; + long startSeqId = 0L; + for (AttributeKvEntry attributeKvEntry : attributeKvEntries) { + if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) { + startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; + } + if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) { + startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; + } + } + if (startSeqId == 0L) { + startSeqId = findStartSeqIdFromOldestEventIfAny(); + } + return Pair.of(startTs, startSeqId); + }, ctx.getGrpcCallbackExecutorService()); + } + + private boolean isSeqIdStartedNewCycle() { + try { + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink); + return !edgeEvents.getData().isEmpty(); + } catch (Exception e) { + log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", this.tenantId, edge.getId(), sessionId, e); + } + return false; + } + + private boolean isNewEdgeEventsAvailable() { + try { + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink); + return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty(); + } catch (Exception e) { + log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", this.tenantId, edge.getId(), sessionId, e); + } + return false; + } + + private long findStartSeqIdFromOldestEventIfAny() { + long startSeqId = 0L; + try { + TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink); + if (!edgeEvents.getData().isEmpty()) { + startSeqId = edgeEvents.getData().get(0).getSeqId() - 1; + } + } catch (Exception e) { + log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", this.tenantId, edge.getId(), sessionId, e); + } + return startSeqId; + } + + private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) { + this.newStartTs = pair.getFirst(); + this.newStartSeqId = pair.getSecond(); + log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId); + List attributes = Arrays.asList( + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()), + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis())); + return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes); + } + protected ListenableFuture> startProcessingEdgeEvents(EdgeEventFetcher fetcher) { SettableFuture> result = SettableFuture.create(); PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index bed4272ec1..ddc70ea3c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -94,6 +94,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); + private final ConcurrentMap edgeEventsProcessed = new ConcurrentHashMap<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -328,13 +329,18 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i long lastConnectTs = System.currentTimeMillis(); save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs); edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId()); - if (edgeGrpcSession instanceof KafkaEdgeGrpcSession) { - initializeKafkaConsumer((KafkaEdgeGrpcSession) edgeGrpcSession, tenantId, edgeId); - } pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT); cancelScheduleEdgeEventsCheck(edgeId); + edgeEventsProcessed.putIfAbsent(edgeId, Boolean.FALSE); + if (edgeGrpcSession instanceof KafkaEdgeGrpcSession session) { + Boolean isChecked = edgeEventsProcessed.get(edgeId); + if (Boolean.FALSE.equals(isChecked)) { + scheduleEdgeEventsCheck(session); + } + initializeKafkaConsumer(session, tenantId, edgeId); + } if (edgeGrpcSession instanceof PostgresEdgeGrpcSession) { - scheduleEdgeEventsCheck((PostgresEdgeGrpcSession) edgeGrpcSession); + scheduleEdgeEventsCheck(edgeGrpcSession); } } @@ -399,7 +405,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void scheduleEdgeEventsCheck(PostgresEdgeGrpcSession session) { + private void scheduleEdgeEventsCheck(AbstractEdgeGrpcSession session) { EdgeId edgeId = session.getEdge().getId(); UUID tenantId = session.getEdge().getTenantId().getId(); if (sessions.containsKey(edgeId)) { @@ -408,7 +414,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); newEventLock.lock(); try { - if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) { + if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId)) && Boolean.FALSE.equals(edgeEventsProcessed.get(edgeId))) { log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId()); sessionNewEvents.put(edgeId, false); Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { @@ -417,7 +423,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (Boolean.TRUE.equals(newEventsAdded)) { sessionNewEvents.put(edgeId, true); } - scheduleEdgeEventsCheck(session); + if (isKafkaSupported) { + edgeEventsProcessed.put(edgeId, true); + } else { + scheduleEdgeEventsCheck(session); + } } @Override @@ -427,7 +437,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } }, ctx.getGrpcCallbackExecutorService()); } else { - scheduleEdgeEventsCheck(session); + if (!isKafkaSupported) { + scheduleEdgeEventsCheck(session); + } } } finally { newEventLock.unlock(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java index 4d0fda21f6..4ef55e5961 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java @@ -15,30 +15,13 @@ */ package org.thingsboard.server.service.edge.rpc; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.springframework.data.util.Pair; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.SortOrder; -import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.service.edge.EdgeContextComponent; -import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiConsumer; @@ -46,15 +29,6 @@ import java.util.function.BiConsumer; @Slf4j public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession { - private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; - private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId"; - - private Long newStartTs; - private Long previousStartTs; - private Long newStartSeqId; - private Long previousStartSeqId; - private Long seqIdEnd; - PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, BiConsumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, @@ -63,133 +37,4 @@ public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession processEdgeEvents() throws Exception { - SettableFuture result = SettableFuture.create(); - log.trace("[{}][{}] starting processing edge events", this.tenantId, this.sessionId); - if (isConnected() && isSyncCompleted()) { - Pair startTsAndSeqId = getQueueStartTsAndSeqId().get(); - this.previousStartTs = startTsAndSeqId.getFirst(); - this.previousStartSeqId = startTsAndSeqId.getSecond(); - GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher( - this.previousStartTs, - this.previousStartSeqId, - this.seqIdEnd, - false, - Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), - ctx.getEdgeEventService()); - Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Pair newStartTsAndSeqId) { - if (newStartTsAndSeqId != null) { - ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); - Futures.addCallback(updateFuture, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List list) { - log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); - if (fetcher.isSeqIdNewCycleStarted()) { - seqIdEnd = fetcher.getSeqIdEnd(); - boolean newEventsAvailable = isNewEdgeEventsAvailable(); - result.set(newEventsAvailable); - } else { - seqIdEnd = null; - boolean newEventsAvailable = isSeqIdStartedNewCycle(); - if (!newEventsAvailable) { - newEventsAvailable = isNewEdgeEventsAvailable(); - } - result.set(newEventsAvailable); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t); - result.setException(t); - } - }, ctx.getGrpcCallbackExecutorService()); - } else { - log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId); - result.set(null); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to process events", tenantId, sessionId, t); - result.setException(t); - } - }, ctx.getGrpcCallbackExecutorService()); - } else { - log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); - result.set(null); - } - return result; - } - - private ListenableFuture> getQueueStartTsAndSeqId() { - ListenableFuture> future = - ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY)); - return Futures.transform(future, attributeKvEntries -> { - long startTs = 0L; - long startSeqId = 0L; - for (AttributeKvEntry attributeKvEntry : attributeKvEntries) { - if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) { - startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; - } - if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) { - startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; - } - } - if (startSeqId == 0L) { - startSeqId = findStartSeqIdFromOldestEventIfAny(); - } - return Pair.of(startTs, startSeqId); - }, ctx.getGrpcCallbackExecutorService()); - } - - private boolean isSeqIdStartedNewCycle() { - try { - TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); - PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink); - return !edgeEvents.getData().isEmpty(); - } catch (Exception e) { - log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", this.tenantId, edge.getId(), sessionId, e); - } - return false; - } - - private boolean isNewEdgeEventsAvailable() { - try { - TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); - PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink); - return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty(); - } catch (Exception e) { - log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", this.tenantId, edge.getId(), sessionId, e); - } - return false; - } - - private long findStartSeqIdFromOldestEventIfAny() { - long startSeqId = 0L; - try { - TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null); - PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink); - if (!edgeEvents.getData().isEmpty()) { - startSeqId = edgeEvents.getData().get(0).getSeqId() - 1; - } - } catch (Exception e) { - log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", this.tenantId, edge.getId(), sessionId, e); - } - return startSeqId; - } - - private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) { - this.newStartTs = pair.getFirst(); - this.newStartSeqId = pair.getSecond(); - log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId); - List attributes = Arrays.asList( - new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()), - new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis())); - return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index 20e2ab6758..eb56878d5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -22,7 +22,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.AttributeScope; -import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.PageDataIterable; @@ -85,19 +85,19 @@ public class KafkaEdgeTopicsCleanUpService { return; } - PageDataIterable edges = new PageDataIterable<>(link -> edgeService.findEdgesByTenantId(tenantId, link), 1024); + PageDataIterable edgeIds = new PageDataIterable<>(link -> edgeService.findEdgeIdsByTenantId(tenantId, link), 1024); long currentTimeMillis = System.currentTimeMillis(); - for (Edge edge : edges) { - Optional attributeOpt = attributesService.find(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get(); + for (EdgeId edgeId : edgeIds) { + Optional attributeOpt = attributesService.find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get(); if (attributeOpt.isPresent()) { Optional lastConnectTimeOpt = attributeOpt.get().getLongValue(); if (lastConnectTimeOpt.isPresent() && isTopicExpired(lastConnectTimeOpt.get(), currentTimeMillis)) { - String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); + String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); if (kafkaAdmin.isTopicEmpty(topic)) { kafkaAdmin.deleteTopic(topic); - log.info("Removed outdated topic for tenant {} and edge {} older than {}", tenantId, edge.getName(), Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS))); + log.info("Removed outdated topic for tenant {} and edge with id {} older than {}", tenantId, edgeId, Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS))); } } } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index c18f42ace7..d49a16e11f 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -32,7 +32,6 @@ import org.thingsboard.server.dao.entity.EntityDaoService; import java.util.List; import java.util.Optional; -import java.util.UUID; public interface EdgeService extends EntityDaoService { @@ -56,6 +55,8 @@ public interface EdgeService extends EntityDaoService { void deleteEdge(TenantId tenantId, EdgeId edgeId); + PageData findEdgeIdsByTenantId(TenantId tenantId, PageLink pageLink); + PageData findEdgesByTenantId(TenantId tenantId, PageLink pageLink); PageData findEdgesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java index be876cb5e0..e012766a8f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java @@ -40,6 +40,8 @@ public interface EdgeDao extends Dao { EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId); + PageData findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink); + PageData findEdgesByTenantId(UUID tenantId, PageLink pageLink); PageData findEdgesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 1e0202bf79..c9b465b178 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -261,6 +261,14 @@ public class EdgeServiceImpl extends AbstractCachedEntityService findEdgeIdsByTenantId(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findEdgeIdsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + validatePageLink(pageLink); + return edgeDao.findEdgeIdsByTenantId(tenantId.getId(), pageLink); + } + @Override public PageData findEdgesByTenantId(TenantId tenantId, PageLink pageLink) { log.trace("Executing findEdgesByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java index b0c6a47783..79c3e93170 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java @@ -42,6 +42,12 @@ public interface EdgeRepository extends JpaRepository { "WHERE d.id = :edgeId") EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId); + @Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " + + "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)") + Page findIdsByTenantId(@Param("tenantId") UUID tenantId, + @Param("textSearch") String textSearch, + Pageable pageable); + @Query("SELECT d FROM EdgeEntity d WHERE d.tenantId = :tenantId " + "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)") Page findByTenantId(@Param("tenantId") UUID tenantId, @@ -93,9 +99,9 @@ public interface EdgeRepository extends JpaRepository { "AND a.customerId = :customerId " + "AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)") Page findEdgeInfosByTenantIdAndCustomerId(@Param("tenantId") UUID tenantId, - @Param("customerId") UUID customerId, - @Param("textSearch") String textSearch, - Pageable pageable); + @Param("customerId") UUID customerId, + @Param("textSearch") String textSearch, + Pageable pageable); @Query("SELECT new org.thingsboard.server.dao.model.sql.EdgeInfoEntity(a, c.title, c.additionalInfo) " + "FROM EdgeEntity a " + @@ -105,10 +111,10 @@ public interface EdgeRepository extends JpaRepository { "AND a.type = :type " + "AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)") Page findEdgeInfosByTenantIdAndCustomerIdAndType(@Param("tenantId") UUID tenantId, - @Param("customerId") UUID customerId, - @Param("type") String type, - @Param("textSearch") String textSearch, - Pageable pageable); + @Param("customerId") UUID customerId, + @Param("type") String type, + @Param("textSearch") String textSearch, + Pageable pageable); @Query("SELECT ee FROM EdgeEntity ee, RelationEntity re WHERE ee.tenantId = :tenantId " + "AND ee.id = re.fromId AND re.fromType = 'EDGE' AND re.relationTypeGroup = 'EDGE' " + @@ -125,10 +131,10 @@ public interface EdgeRepository extends JpaRepository { "AND re.relationType = 'Contains' AND re.toId = :entityId AND re.toType = :entityType " + "AND (:textSearch IS NULL OR ilike(ee.name, CONCAT('%', :textSearch, '%')) = true)") Page findIdsByTenantIdAndEntityId(@Param("tenantId") UUID tenantId, - @Param("entityId") UUID entityId, - @Param("entityType") String entityType, - @Param("textSearch") String textSearch, - Pageable pageable); + @Param("entityId") UUID entityId, + @Param("entityType") String entityType, + @Param("textSearch") String textSearch, + Pageable pageable); @Query("SELECT ee FROM EdgeEntity ee, TenantEntity te WHERE ee.tenantId = te.id AND te.tenantProfileId = :tenantProfileId ") Page findByTenantProfileId(@Param("tenantProfileId") UUID tenantProfileId, diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java index 3c2d64825f..b02a28fee4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java @@ -64,6 +64,15 @@ public class JpaEdgeDao extends JpaAbstractDao implements Edge return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId)); } + @Override + public PageData findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) { + return DaoUtil.pageToPageData( + edgeRepository.findIdsByTenantId( + tenantId, + pageLink.getTextSearch(), + DaoUtil.toPageable(pageLink))).mapData(EdgeId::fromUUID); + } + @Override public PageData findEdgesByTenantId(UUID tenantId, PageLink pageLink) { return DaoUtil.toPageData(