Add logic to process postgres edge event fetcher on start

This commit is contained in:
Andrii Landiak 2024-11-01 17:53:11 +02:00
parent b981281205
commit e97668b01c
9 changed files with 262 additions and 237 deletions

View File

@ -34,11 +34,14 @@ import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
@ -89,6 +92,7 @@ import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceProces
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -103,27 +107,36 @@ import java.util.function.BiConsumer;
@Data @Data
public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<T>> implements EdgeGrpcSession, Closeable { public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<T>> implements EdgeGrpcSession, Closeable {
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
private static final int MAX_DOWNLINK_ATTEMPTS = 10; private static final int MAX_DOWNLINK_ATTEMPTS = 10;
private static final String RATE_LIMIT_REACHED = "Rate limit reached"; private static final String RATE_LIMIT_REACHED = "Rate limit reached";
protected static final ConcurrentLinkedQueue<EdgeEvent> highPriorityQueue = new ConcurrentLinkedQueue<>(); protected static final ConcurrentLinkedQueue<EdgeEvent> highPriorityQueue = new ConcurrentLinkedQueue<>();
protected UUID sessionId; private UUID sessionId;
protected BiConsumer<EdgeId, T> sessionOpenListener; private BiConsumer<EdgeId, T> sessionOpenListener;
protected BiConsumer<Edge, UUID> sessionCloseListener; private BiConsumer<Edge, UUID> sessionCloseListener;
private final EdgeSessionState sessionState = new EdgeSessionState(); private final EdgeSessionState sessionState = new EdgeSessionState();
private final ReentrantLock downlinkMsgLock = new ReentrantLock(); private final ReentrantLock downlinkMsgLock = new ReentrantLock();
protected EdgeContextComponent ctx; private EdgeContextComponent ctx;
protected Edge edge; private Edge edge;
protected TenantId tenantId; private TenantId tenantId;
protected StreamObserver<RequestMsg> inputStream; private Long newStartTs;
protected StreamObserver<ResponseMsg> outputStream; private Long previousStartTs;
private Long newStartSeqId;
private Long previousStartSeqId;
private Long seqIdEnd;
protected boolean connected; private StreamObserver<RequestMsg> inputStream;
protected volatile boolean syncCompleted; private StreamObserver<ResponseMsg> outputStream;
private boolean connected;
private volatile boolean syncCompleted;
private EdgeVersion edgeVersion; private EdgeVersion edgeVersion;
private int maxInboundMessageSize; private int maxInboundMessageSize;
@ -231,6 +244,52 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
sendDownlinkMsg(edgeConfigMsg); sendDownlinkMsg(edgeConfigMsg);
} }
@Override
public void startSyncProcess(boolean fullSync) {
log.info("[{}][{}][{}] Staring edge sync process", this.tenantId, edge.getId(), this.sessionId);
syncCompleted = false;
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
}
private void doSync(EdgeSyncCursor cursor) {
if (cursor.hasNext()) {
EdgeEventFetcher next = cursor.getNext();
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
this.tenantId, edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
ListenableFuture<Pair<Long, Long>> future = startProcessingEdgeEvents(next);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Pair<Long, Long> result) {
doSync(cursor);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.info("[{}][{}] sync process completed", this.tenantId, edge.getId());
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() {
@Override
public void onSuccess(Boolean isInterrupted) {
markSyncCompletedSendEdgeEventUpdate();
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Exception during sending sync complete", tenantId, edge.getId(), t);
markSyncCompletedSendEdgeEventUpdate();
}
}, ctx.getGrpcCallbackExecutorService());
}
}
protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) { protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
try { try {
if (!highPriorityQueue.isEmpty()) { if (!highPriorityQueue.isEmpty()) {
@ -327,57 +386,11 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
ctx.getAttributesService().save(this.tenantId, this.edge.getId(), AttributeScope.SERVER_SCOPE, attributeKvEntry); ctx.getAttributesService().save(this.tenantId, this.edge.getId(), AttributeScope.SERVER_SCOPE, attributeKvEntry);
} }
@Override
public void startSyncProcess(boolean fullSync) {
log.info("[{}][{}][{}] Staring edge sync process", this.tenantId, edge.getId(), this.sessionId);
syncCompleted = false;
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
}
private void interruptGeneralProcessingOnSync() { private void interruptGeneralProcessingOnSync() {
log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", this.tenantId, edge.getId(), this.sessionId); log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", this.tenantId, edge.getId(), this.sessionId);
stopCurrentSendDownlinkMsgsTask(true); stopCurrentSendDownlinkMsgsTask(true);
} }
private void doSync(EdgeSyncCursor cursor) {
if (cursor.hasNext()) {
EdgeEventFetcher next = cursor.getNext();
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
this.tenantId, edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
ListenableFuture<Pair<Long, Long>> future = startProcessingEdgeEvents(next);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Pair<Long, Long> result) {
doSync(cursor);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.info("[{}][{}] sync process completed", this.tenantId, edge.getId());
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() {
@Override
public void onSuccess(Boolean isInterrupted) {
markSyncCompletedSendEdgeEventUpdate();
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Exception during sending sync complete", tenantId, edge.getId(), t);
markSyncCompletedSendEdgeEventUpdate();
}
}, ctx.getGrpcCallbackExecutorService());
}
}
protected ListenableFuture<Boolean> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) { protected ListenableFuture<Boolean> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
interruptPreviousSendDownlinkMsgsTask(); interruptPreviousSendDownlinkMsgsTask();
@ -535,6 +548,68 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
} }
} }
protected ListenableFuture<Boolean> processEdgeEvents() throws Exception {
SettableFuture<Boolean> result = SettableFuture.create();
log.trace("[{}][{}] starting processing edge events", this.tenantId, this.sessionId);
if (isConnected() && isSyncCompleted()) {
Pair<Long, Long> startTsAndSeqId = getQueueStartTsAndSeqId().get();
this.previousStartTs = startTsAndSeqId.getFirst();
this.previousStartSeqId = startTsAndSeqId.getSecond();
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
this.previousStartTs,
this.previousStartSeqId,
this.seqIdEnd,
false,
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService());
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
if (newStartTsAndSeqId != null) {
ListenableFuture<List<Long>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Long> list) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
if (fetcher.isSeqIdNewCycleStarted()) {
seqIdEnd = fetcher.getSeqIdEnd();
boolean newEventsAvailable = isNewEdgeEventsAvailable();
result.set(newEventsAvailable);
} else {
seqIdEnd = null;
boolean newEventsAvailable = isSeqIdStartedNewCycle();
if (!newEventsAvailable) {
newEventsAvailable = isNewEdgeEventsAvailable();
}
result.set(newEventsAvailable);
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t);
result.setException(t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId);
result.set(null);
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to process events", tenantId, sessionId, t);
result.setException(t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
result.set(null);
}
return result;
}
protected List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) { protected List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
List<DownlinkMsg> result = new ArrayList<>(); List<DownlinkMsg> result = new ArrayList<>();
for (EdgeEvent edgeEvent : edgeEvents) { for (EdgeEvent edgeEvent : edgeEvents) {
@ -569,6 +644,73 @@ public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<
return result; return result;
} }
private ListenableFuture<Pair<Long, Long>> getQueueStartTsAndSeqId() {
ListenableFuture<List<AttributeKvEntry>> future =
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY));
return Futures.transform(future, attributeKvEntries -> {
long startTs = 0L;
long startSeqId = 0L;
for (AttributeKvEntry attributeKvEntry : attributeKvEntries) {
if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) {
startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
}
if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) {
startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
}
}
if (startSeqId == 0L) {
startSeqId = findStartSeqIdFromOldestEventIfAny();
}
return Pair.of(startTs, startSeqId);
}, ctx.getGrpcCallbackExecutorService());
}
private boolean isSeqIdStartedNewCycle() {
try {
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink);
return !edgeEvents.getData().isEmpty();
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", this.tenantId, edge.getId(), sessionId, e);
}
return false;
}
private boolean isNewEdgeEventsAvailable() {
try {
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink);
return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty();
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", this.tenantId, edge.getId(), sessionId, e);
}
return false;
}
private long findStartSeqIdFromOldestEventIfAny() {
long startSeqId = 0L;
try {
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null);
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink);
if (!edgeEvents.getData().isEmpty()) {
startSeqId = edgeEvents.getData().get(0).getSeqId() - 1;
}
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", this.tenantId, edge.getId(), sessionId, e);
}
return startSeqId;
}
private ListenableFuture<List<Long>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
this.newStartTs = pair.getFirst();
this.newStartSeqId = pair.getSecond();
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId);
List<AttributeKvEntry> attributes = Arrays.asList(
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()),
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis()));
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes);
}
protected ListenableFuture<Pair<Long, Long>> startProcessingEdgeEvents(EdgeEventFetcher fetcher) { protected ListenableFuture<Pair<Long, Long>> startProcessingEdgeEvents(EdgeEventFetcher fetcher) {
SettableFuture<Pair<Long, Long>> result = SettableFuture.create(); SettableFuture<Pair<Long, Long>> result = SettableFuture.create();
PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()); PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount());

View File

@ -94,6 +94,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>(); private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Boolean> edgeEventsProcessed = new ConcurrentHashMap<>();
@Value("${edges.rpc.port}") @Value("${edges.rpc.port}")
private int rpcPort; private int rpcPort;
@ -328,13 +329,18 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
long lastConnectTs = System.currentTimeMillis(); long lastConnectTs = System.currentTimeMillis();
save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs); save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs);
edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId()); edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId());
if (edgeGrpcSession instanceof KafkaEdgeGrpcSession) {
initializeKafkaConsumer((KafkaEdgeGrpcSession) edgeGrpcSession, tenantId, edgeId);
}
pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT); pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT);
cancelScheduleEdgeEventsCheck(edgeId); cancelScheduleEdgeEventsCheck(edgeId);
edgeEventsProcessed.putIfAbsent(edgeId, Boolean.FALSE);
if (edgeGrpcSession instanceof KafkaEdgeGrpcSession session) {
Boolean isChecked = edgeEventsProcessed.get(edgeId);
if (Boolean.FALSE.equals(isChecked)) {
scheduleEdgeEventsCheck(session);
}
initializeKafkaConsumer(session, tenantId, edgeId);
}
if (edgeGrpcSession instanceof PostgresEdgeGrpcSession) { if (edgeGrpcSession instanceof PostgresEdgeGrpcSession) {
scheduleEdgeEventsCheck((PostgresEdgeGrpcSession) edgeGrpcSession); scheduleEdgeEventsCheck(edgeGrpcSession);
} }
} }
@ -399,7 +405,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
} }
private void scheduleEdgeEventsCheck(PostgresEdgeGrpcSession session) { private void scheduleEdgeEventsCheck(AbstractEdgeGrpcSession<?> session) {
EdgeId edgeId = session.getEdge().getId(); EdgeId edgeId = session.getEdge().getId();
UUID tenantId = session.getEdge().getTenantId().getId(); UUID tenantId = session.getEdge().getTenantId().getId();
if (sessions.containsKey(edgeId)) { if (sessions.containsKey(edgeId)) {
@ -408,7 +414,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock(); newEventLock.lock();
try { try {
if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) { if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId)) && Boolean.FALSE.equals(edgeEventsProcessed.get(edgeId))) {
log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId()); log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, false); sessionNewEvents.put(edgeId, false);
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
@ -417,7 +423,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (Boolean.TRUE.equals(newEventsAdded)) { if (Boolean.TRUE.equals(newEventsAdded)) {
sessionNewEvents.put(edgeId, true); sessionNewEvents.put(edgeId, true);
} }
scheduleEdgeEventsCheck(session); if (isKafkaSupported) {
edgeEventsProcessed.put(edgeId, true);
} else {
scheduleEdgeEventsCheck(session);
}
} }
@Override @Override
@ -427,7 +437,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
}, ctx.getGrpcCallbackExecutorService()); }, ctx.getGrpcCallbackExecutorService());
} else { } else {
scheduleEdgeEventsCheck(session); if (!isKafkaSupported) {
scheduleEdgeEventsCheck(session);
}
} }
} finally { } finally {
newEventLock.unlock(); newEventLock.unlock();

View File

@ -15,30 +15,13 @@
*/ */
package org.thingsboard.server.service.edge.rpc; package org.thingsboard.server.service.edge.rpc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.data.util.Pair;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
import java.util.Arrays;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@ -46,15 +29,6 @@ import java.util.function.BiConsumer;
@Slf4j @Slf4j
public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession<PostgresEdgeGrpcSession> { public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession<PostgresEdgeGrpcSession> {
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
private Long newStartTs;
private Long previousStartTs;
private Long newStartSeqId;
private Long previousStartSeqId;
private Long seqIdEnd;
PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
BiConsumer<EdgeId, PostgresEdgeGrpcSession> sessionOpenListener, BiConsumer<EdgeId, PostgresEdgeGrpcSession> sessionOpenListener,
BiConsumer<Edge, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, BiConsumer<Edge, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService,
@ -63,133 +37,4 @@ public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession<PostgresEdg
initInputStream(); initInputStream();
} }
protected ListenableFuture<Boolean> processEdgeEvents() throws Exception {
SettableFuture<Boolean> result = SettableFuture.create();
log.trace("[{}][{}] starting processing edge events", this.tenantId, this.sessionId);
if (isConnected() && isSyncCompleted()) {
Pair<Long, Long> startTsAndSeqId = getQueueStartTsAndSeqId().get();
this.previousStartTs = startTsAndSeqId.getFirst();
this.previousStartSeqId = startTsAndSeqId.getSecond();
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
this.previousStartTs,
this.previousStartSeqId,
this.seqIdEnd,
false,
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService());
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
if (newStartTsAndSeqId != null) {
ListenableFuture<List<Long>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Long> list) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
if (fetcher.isSeqIdNewCycleStarted()) {
seqIdEnd = fetcher.getSeqIdEnd();
boolean newEventsAvailable = isNewEdgeEventsAvailable();
result.set(newEventsAvailable);
} else {
seqIdEnd = null;
boolean newEventsAvailable = isSeqIdStartedNewCycle();
if (!newEventsAvailable) {
newEventsAvailable = isNewEdgeEventsAvailable();
}
result.set(newEventsAvailable);
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t);
result.setException(t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId);
result.set(null);
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to process events", tenantId, sessionId, t);
result.setException(t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
result.set(null);
}
return result;
}
private ListenableFuture<Pair<Long, Long>> getQueueStartTsAndSeqId() {
ListenableFuture<List<AttributeKvEntry>> future =
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY));
return Futures.transform(future, attributeKvEntries -> {
long startTs = 0L;
long startSeqId = 0L;
for (AttributeKvEntry attributeKvEntry : attributeKvEntries) {
if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) {
startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
}
if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) {
startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
}
}
if (startSeqId == 0L) {
startSeqId = findStartSeqIdFromOldestEventIfAny();
}
return Pair.of(startTs, startSeqId);
}, ctx.getGrpcCallbackExecutorService());
}
private boolean isSeqIdStartedNewCycle() {
try {
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink);
return !edgeEvents.getData().isEmpty();
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", this.tenantId, edge.getId(), sessionId, e);
}
return false;
}
private boolean isNewEdgeEventsAvailable() {
try {
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink);
return !edgeEvents.getData().isEmpty() || !highPriorityQueue.isEmpty();
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", this.tenantId, edge.getId(), sessionId, e);
}
return false;
}
private long findStartSeqIdFromOldestEventIfAny() {
long startSeqId = 0L;
try {
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null);
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink);
if (!edgeEvents.getData().isEmpty()) {
startSeqId = edgeEvents.getData().get(0).getSeqId() - 1;
}
} catch (Exception e) {
log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", this.tenantId, edge.getId(), sessionId, e);
}
return startSeqId;
}
private ListenableFuture<List<Long>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
this.newStartTs = pair.getFirst();
this.newStartSeqId = pair.getSecond();
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId);
List<AttributeKvEntry> attributes = Arrays.asList(
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()),
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis()));
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes);
}
} }

View File

@ -22,7 +22,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterable;
@ -85,19 +85,19 @@ public class KafkaEdgeTopicsCleanUpService {
return; return;
} }
PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeService.findEdgesByTenantId(tenantId, link), 1024); PageDataIterable<EdgeId> edgeIds = new PageDataIterable<>(link -> edgeService.findEdgeIdsByTenantId(tenantId, link), 1024);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
for (Edge edge : edges) { for (EdgeId edgeId : edgeIds) {
Optional<AttributeKvEntry> attributeOpt = attributesService.find(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get(); Optional<AttributeKvEntry> attributeOpt = attributesService.find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get();
if (attributeOpt.isPresent()) { if (attributeOpt.isPresent()) {
Optional<Long> lastConnectTimeOpt = attributeOpt.get().getLongValue(); Optional<Long> lastConnectTimeOpt = attributeOpt.get().getLongValue();
if (lastConnectTimeOpt.isPresent() && isTopicExpired(lastConnectTimeOpt.get(), currentTimeMillis)) { if (lastConnectTimeOpt.isPresent() && isTopicExpired(lastConnectTimeOpt.get(), currentTimeMillis)) {
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
if (kafkaAdmin.isTopicEmpty(topic)) { if (kafkaAdmin.isTopicEmpty(topic)) {
kafkaAdmin.deleteTopic(topic); kafkaAdmin.deleteTopic(topic);
log.info("Removed outdated topic for tenant {} and edge {} older than {}", tenantId, edge.getName(), Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS))); log.info("Removed outdated topic for tenant {} and edge with id {} older than {}", tenantId, edgeId, Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS)));
} }
} }
} }

View File

@ -32,7 +32,6 @@ import org.thingsboard.server.dao.entity.EntityDaoService;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
public interface EdgeService extends EntityDaoService { public interface EdgeService extends EntityDaoService {
@ -56,6 +55,8 @@ public interface EdgeService extends EntityDaoService {
void deleteEdge(TenantId tenantId, EdgeId edgeId); void deleteEdge(TenantId tenantId, EdgeId edgeId);
PageData<EdgeId> findEdgeIdsByTenantId(TenantId tenantId, PageLink pageLink);
PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink); PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink);
PageData<Edge> findEdgesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); PageData<Edge> findEdgesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);

View File

@ -40,6 +40,8 @@ public interface EdgeDao extends Dao<Edge> {
EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId); EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId);
PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink);
PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink); PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink);
PageData<Edge> findEdgesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink); PageData<Edge> findEdgesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink);

View File

@ -261,6 +261,14 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
deleteEdge(tenantId, (EdgeId) id); deleteEdge(tenantId, (EdgeId) id);
} }
@Override
public PageData<EdgeId> findEdgeIdsByTenantId(TenantId tenantId, PageLink pageLink) {
log.trace("Executing findEdgeIdsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
validatePageLink(pageLink);
return edgeDao.findEdgeIdsByTenantId(tenantId.getId(), pageLink);
}
@Override @Override
public PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink) { public PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink) {
log.trace("Executing findEdgesByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); log.trace("Executing findEdgesByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);

View File

@ -42,6 +42,12 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
"WHERE d.id = :edgeId") "WHERE d.id = :edgeId")
EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId); EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId);
@Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
"AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
Page<UUID> findIdsByTenantId(@Param("tenantId") UUID tenantId,
@Param("textSearch") String textSearch,
Pageable pageable);
@Query("SELECT d FROM EdgeEntity d WHERE d.tenantId = :tenantId " + @Query("SELECT d FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
"AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)") "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
Page<EdgeEntity> findByTenantId(@Param("tenantId") UUID tenantId, Page<EdgeEntity> findByTenantId(@Param("tenantId") UUID tenantId,
@ -93,9 +99,9 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
"AND a.customerId = :customerId " + "AND a.customerId = :customerId " +
"AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)") "AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)")
Page<EdgeInfoEntity> findEdgeInfosByTenantIdAndCustomerId(@Param("tenantId") UUID tenantId, Page<EdgeInfoEntity> findEdgeInfosByTenantIdAndCustomerId(@Param("tenantId") UUID tenantId,
@Param("customerId") UUID customerId, @Param("customerId") UUID customerId,
@Param("textSearch") String textSearch, @Param("textSearch") String textSearch,
Pageable pageable); Pageable pageable);
@Query("SELECT new org.thingsboard.server.dao.model.sql.EdgeInfoEntity(a, c.title, c.additionalInfo) " + @Query("SELECT new org.thingsboard.server.dao.model.sql.EdgeInfoEntity(a, c.title, c.additionalInfo) " +
"FROM EdgeEntity a " + "FROM EdgeEntity a " +
@ -105,10 +111,10 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
"AND a.type = :type " + "AND a.type = :type " +
"AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)") "AND (:textSearch IS NULL OR ilike(a.name, CONCAT('%', :textSearch, '%')) = true)")
Page<EdgeInfoEntity> findEdgeInfosByTenantIdAndCustomerIdAndType(@Param("tenantId") UUID tenantId, Page<EdgeInfoEntity> findEdgeInfosByTenantIdAndCustomerIdAndType(@Param("tenantId") UUID tenantId,
@Param("customerId") UUID customerId, @Param("customerId") UUID customerId,
@Param("type") String type, @Param("type") String type,
@Param("textSearch") String textSearch, @Param("textSearch") String textSearch,
Pageable pageable); Pageable pageable);
@Query("SELECT ee FROM EdgeEntity ee, RelationEntity re WHERE ee.tenantId = :tenantId " + @Query("SELECT ee FROM EdgeEntity ee, RelationEntity re WHERE ee.tenantId = :tenantId " +
"AND ee.id = re.fromId AND re.fromType = 'EDGE' AND re.relationTypeGroup = 'EDGE' " + "AND ee.id = re.fromId AND re.fromType = 'EDGE' AND re.relationTypeGroup = 'EDGE' " +
@ -125,10 +131,10 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
"AND re.relationType = 'Contains' AND re.toId = :entityId AND re.toType = :entityType " + "AND re.relationType = 'Contains' AND re.toId = :entityId AND re.toType = :entityType " +
"AND (:textSearch IS NULL OR ilike(ee.name, CONCAT('%', :textSearch, '%')) = true)") "AND (:textSearch IS NULL OR ilike(ee.name, CONCAT('%', :textSearch, '%')) = true)")
Page<UUID> findIdsByTenantIdAndEntityId(@Param("tenantId") UUID tenantId, Page<UUID> findIdsByTenantIdAndEntityId(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId, @Param("entityId") UUID entityId,
@Param("entityType") String entityType, @Param("entityType") String entityType,
@Param("textSearch") String textSearch, @Param("textSearch") String textSearch,
Pageable pageable); Pageable pageable);
@Query("SELECT ee FROM EdgeEntity ee, TenantEntity te WHERE ee.tenantId = te.id AND te.tenantProfileId = :tenantProfileId ") @Query("SELECT ee FROM EdgeEntity ee, TenantEntity te WHERE ee.tenantId = te.id AND te.tenantProfileId = :tenantProfileId ")
Page<EdgeEntity> findByTenantProfileId(@Param("tenantProfileId") UUID tenantProfileId, Page<EdgeEntity> findByTenantProfileId(@Param("tenantProfileId") UUID tenantProfileId,

View File

@ -64,6 +64,15 @@ public class JpaEdgeDao extends JpaAbstractDao<EdgeEntity, Edge> implements Edge
return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId)); return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId));
} }
@Override
public PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) {
return DaoUtil.pageToPageData(
edgeRepository.findIdsByTenantId(
tenantId,
pageLink.getTextSearch(),
DaoUtil.toPageable(pageLink))).mapData(EdgeId::fromUUID);
}
@Override @Override
public PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink) { public PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink) {
return DaoUtil.toPageData( return DaoUtil.toPageData(