Merge remote-tracking branch 'origin/develop/3.5.2' into develop/3.6
This commit is contained in:
commit
d17c7dbc28
@ -53,6 +53,69 @@ $$;
|
|||||||
|
|
||||||
-- NOTIFICATION CONFIGS VERSION CONTROL END
|
-- NOTIFICATION CONFIGS VERSION CONTROL END
|
||||||
|
|
||||||
|
-- EDGE EVENTS MIGRATION START
|
||||||
|
DO
|
||||||
|
$$
|
||||||
|
DECLARE table_partition RECORD;
|
||||||
|
BEGIN
|
||||||
|
-- in case of running the upgrade script a second time:
|
||||||
|
IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN
|
||||||
|
ALTER TABLE edge_event RENAME TO old_edge_event;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_old_edge_event_created_time_tmp ON old_edge_event(created_time);
|
||||||
|
ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time;
|
||||||
|
|
||||||
|
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
|
||||||
|
FROM pg_tables WHERE tablename LIKE 'edge_event_%'
|
||||||
|
LOOP
|
||||||
|
EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts);
|
||||||
|
END LOOP;
|
||||||
|
ELSE
|
||||||
|
RAISE NOTICE 'Table old_edge_event already exists, leaving as is';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS edge_event (
|
||||||
|
seq_id INT GENERATED ALWAYS AS IDENTITY,
|
||||||
|
id uuid NOT NULL,
|
||||||
|
created_time bigint NOT NULL,
|
||||||
|
edge_id uuid,
|
||||||
|
edge_event_type varchar(255),
|
||||||
|
edge_event_uid varchar(255),
|
||||||
|
entity_id uuid,
|
||||||
|
edge_event_action varchar(255),
|
||||||
|
body varchar(10000000),
|
||||||
|
tenant_id uuid,
|
||||||
|
ts bigint NOT NULL
|
||||||
|
) PARTITION BY RANGE (created_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_edge_event_id ON edge_event(id);
|
||||||
|
ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE;
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
DECLARE
|
||||||
|
p RECORD;
|
||||||
|
partition_end_ts BIGINT;
|
||||||
|
BEGIN
|
||||||
|
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event
|
||||||
|
WHERE created_time >= start_time_ms AND created_time < end_time_ms
|
||||||
|
LOOP
|
||||||
|
partition_end_ts = p.partition_ts + partition_size_ms;
|
||||||
|
RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
|
||||||
|
EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' ||
|
||||||
|
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
INSERT INTO edge_event (id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts)
|
||||||
|
SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts
|
||||||
|
FROM old_edge_event
|
||||||
|
WHERE created_time >= start_time_ms AND created_time < end_time_ms;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
-- EDGE EVENTS MIGRATION END
|
||||||
|
|
||||||
ALTER TABLE resource
|
ALTER TABLE resource
|
||||||
ADD COLUMN IF NOT EXISTS etag varchar;
|
ADD COLUMN IF NOT EXISTS etag varchar;
|
||||||
|
|
||||||
|
|||||||
@ -73,10 +73,14 @@ public class AppActor extends ContextAwareActor {
|
|||||||
@Override
|
@Override
|
||||||
protected boolean doProcess(TbActorMsg msg) {
|
protected boolean doProcess(TbActorMsg msg) {
|
||||||
if (!ruleChainsInitialized) {
|
if (!ruleChainsInitialized) {
|
||||||
initTenantActors();
|
if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
|
||||||
ruleChainsInitialized = true;
|
initTenantActors();
|
||||||
if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
|
ruleChainsInitialized = true;
|
||||||
log.warn("Rule Chains initialized by unexpected message: {}", msg);
|
} else {
|
||||||
|
if (!msg.getMsgType().isIgnoreOnStart()) {
|
||||||
|
log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch (msg.getMsgType()) {
|
switch (msg.getMsgType()) {
|
||||||
|
|||||||
@ -85,6 +85,6 @@ public class EdgeEventController extends BaseController {
|
|||||||
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
|
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
|
||||||
checkEdgeId(edgeId, Operation.READ);
|
checkEdgeId(edgeId, Operation.READ);
|
||||||
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
|
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
|
||||||
return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false));
|
return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -341,7 +341,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
sessionNewEvents.put(edgeId, false);
|
sessionNewEvents.put(edgeId, false);
|
||||||
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
|
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Void result) {
|
public void onSuccess(Boolean newEventsAdded) {
|
||||||
|
if (Boolean.TRUE.equals(newEventsAdded)) {
|
||||||
|
sessionNewEvents.put(edgeId, true);
|
||||||
|
}
|
||||||
scheduleEdgeEventsCheck(session);
|
scheduleEdgeEventsCheck(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import io.grpc.stub.StreamObserver;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||||
|
import org.springframework.data.util.Pair;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.EdgeUtils;
|
import org.thingsboard.server.common.data.EdgeUtils;
|
||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
import org.thingsboard.server.common.data.page.PageLink;
|
import org.thingsboard.server.common.data.page.PageLink;
|
||||||
|
import org.thingsboard.server.common.data.page.SortOrder;
|
||||||
|
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
|
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
|
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
|
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
|
||||||
@ -68,17 +71,15 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
|
|||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Data
|
@Data
|
||||||
@ -89,6 +90,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected
|
private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected
|
||||||
|
|
||||||
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
||||||
|
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
|
||||||
|
|
||||||
private final UUID sessionId;
|
private final UUID sessionId;
|
||||||
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
|
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
|
||||||
@ -103,6 +105,12 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
private boolean connected;
|
private boolean connected;
|
||||||
private boolean syncCompleted;
|
private boolean syncCompleted;
|
||||||
|
|
||||||
|
private Long newStartTs;
|
||||||
|
private Long previousStartTs;
|
||||||
|
private Long newStartSeqId;
|
||||||
|
private Long previousStartSeqId;
|
||||||
|
private Long seqIdEnd;
|
||||||
|
|
||||||
private EdgeVersion edgeVersion;
|
private EdgeVersion edgeVersion;
|
||||||
|
|
||||||
private int maxInboundMessageSize;
|
private int maxInboundMessageSize;
|
||||||
@ -204,10 +212,10 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
EdgeEventFetcher next = cursor.getNext();
|
EdgeEventFetcher next = cursor.getNext();
|
||||||
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
|
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
|
||||||
edge.getTenantId(), edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
|
edge.getTenantId(), edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
|
||||||
ListenableFuture<UUID> uuidListenableFuture = startProcessingEdgeEvents(next);
|
ListenableFuture<Pair<Long, Long>> future = startProcessingEdgeEvents(next);
|
||||||
Futures.addCallback(uuidListenableFuture, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable UUID result) {
|
public void onSuccess(@Nullable Pair<Long, Long> result) {
|
||||||
doSync(cursor);
|
doSync(cursor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,36 +315,51 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
sendDownlinkMsg(edgeConfigMsg);
|
sendDownlinkMsg(edgeConfigMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
ListenableFuture<Void> processEdgeEvents() throws Exception {
|
ListenableFuture<Boolean> processEdgeEvents() throws Exception {
|
||||||
SettableFuture<Void> result = SettableFuture.create();
|
SettableFuture<Boolean> result = SettableFuture.create();
|
||||||
log.trace("[{}] starting processing edge events", this.sessionId);
|
log.trace("[{}] starting processing edge events", this.sessionId);
|
||||||
if (isConnected() && isSyncCompleted()) {
|
if (isConnected() && isSyncCompleted()) {
|
||||||
Long queueStartTs = getQueueStartTs().get();
|
Pair<Long, Long> startTsAndSeqId = getQueueStartTsAndSeqId().get();
|
||||||
|
this.previousStartTs = startTsAndSeqId.getFirst();
|
||||||
|
this.previousStartSeqId = startTsAndSeqId.getSecond();
|
||||||
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
|
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
|
||||||
queueStartTs,
|
this.previousStartTs,
|
||||||
|
this.previousStartSeqId,
|
||||||
|
this.seqIdEnd,
|
||||||
|
false,
|
||||||
|
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
|
||||||
ctx.getEdgeEventService());
|
ctx.getEdgeEventService());
|
||||||
ListenableFuture<UUID> ifOffsetFuture = startProcessingEdgeEvents(fetcher);
|
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
|
||||||
Futures.addCallback(ifOffsetFuture, new FutureCallback<>() {
|
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable UUID ifOffset) {
|
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
|
||||||
if (ifOffset != null) {
|
if (newStartTsAndSeqId != null) {
|
||||||
Long newStartTs = Uuids.unixTimestamp(ifOffset);
|
ListenableFuture<List<String>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
|
||||||
ListenableFuture<List<String>> updateFuture = updateQueueStartTs(newStartTs);
|
|
||||||
Futures.addCallback(updateFuture, new FutureCallback<>() {
|
Futures.addCallback(updateFuture, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable List<String> list) {
|
public void onSuccess(@Nullable List<String> list) {
|
||||||
log.debug("[{}] queue offset was updated [{}][{}]", sessionId, ifOffset, newStartTs);
|
log.debug("[{}] queue offset was updated [{}]", sessionId, newStartTsAndSeqId);
|
||||||
result.set(null);
|
if (fetcher.isSeqIdNewCycleStarted()) {
|
||||||
|
seqIdEnd = fetcher.getSeqIdEnd();
|
||||||
|
boolean newEventsAvailable = isNewEdgeEventsAvailable();
|
||||||
|
result.set(newEventsAvailable);
|
||||||
|
} else {
|
||||||
|
seqIdEnd = null;
|
||||||
|
boolean newEventsAvailable = isSeqIdStartedNewCycle();
|
||||||
|
if (!newEventsAvailable) {
|
||||||
|
newEventsAvailable = isNewEdgeEventsAvailable();
|
||||||
|
}
|
||||||
|
result.set(newEventsAvailable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.error("[{}] Failed to update queue offset [{}]", sessionId, ifOffset, t);
|
log.error("[{}] Failed to update queue offset [{}]", sessionId, newStartTsAndSeqId, t);
|
||||||
result.setException(t);
|
result.setException(t);
|
||||||
}
|
}
|
||||||
}, ctx.getGrpcCallbackExecutorService());
|
}, ctx.getGrpcCallbackExecutorService());
|
||||||
} else {
|
} else {
|
||||||
log.trace("[{}] ifOffset is null. Skipping iteration without db update", sessionId);
|
log.trace("[{}] newStartTsAndSeqId is null. Skipping iteration without db update", sessionId);
|
||||||
result.set(null);
|
result.set(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -354,14 +377,14 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<UUID> startProcessingEdgeEvents(EdgeEventFetcher fetcher) {
|
private ListenableFuture<Pair<Long, Long>> startProcessingEdgeEvents(EdgeEventFetcher fetcher) {
|
||||||
SettableFuture<UUID> result = SettableFuture.create();
|
SettableFuture<Pair<Long, Long>> result = SettableFuture.create();
|
||||||
PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount());
|
PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount());
|
||||||
processEdgeEvents(fetcher, pageLink, result);
|
processEdgeEvents(fetcher, pageLink, result);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<UUID> result) {
|
private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
|
||||||
try {
|
try {
|
||||||
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
|
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
|
||||||
if (isConnected() && !pageData.getData().isEmpty()) {
|
if (isConnected() && !pageData.getData().isEmpty()) {
|
||||||
@ -377,8 +400,15 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
if (isConnected() && pageData.hasNext()) {
|
if (isConnected() && pageData.hasNext()) {
|
||||||
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
|
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
|
||||||
} else {
|
} else {
|
||||||
UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
|
EdgeEvent latestEdgeEvent = pageData.getData().get(pageData.getData().size() - 1);
|
||||||
result.set(ifOffset);
|
UUID idOffset = latestEdgeEvent.getUuidId();
|
||||||
|
if (idOffset != null) {
|
||||||
|
Long newStartTs = Uuids.unixTimestamp(idOffset);
|
||||||
|
long newStartSeqId = latestEdgeEvent.getSeqId();
|
||||||
|
result.set(Pair.of(newStartTs, newStartSeqId));
|
||||||
|
} else {
|
||||||
|
result.set(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -461,69 +491,113 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
|
|
||||||
log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent);
|
|
||||||
DownlinkMsg downlinkMsg = null;
|
|
||||||
try {
|
|
||||||
switch (edgeEvent.getAction()) {
|
|
||||||
case UPDATED:
|
|
||||||
case ADDED:
|
|
||||||
case DELETED:
|
|
||||||
case ASSIGNED_TO_EDGE:
|
|
||||||
case UNASSIGNED_FROM_EDGE:
|
|
||||||
case ALARM_ACK:
|
|
||||||
case ALARM_CLEAR:
|
|
||||||
case CREDENTIALS_UPDATED:
|
|
||||||
case RELATION_ADD_OR_UPDATE:
|
|
||||||
case RELATION_DELETED:
|
|
||||||
case ASSIGNED_TO_CUSTOMER:
|
|
||||||
case UNASSIGNED_FROM_CUSTOMER:
|
|
||||||
case CREDENTIALS_REQUEST:
|
|
||||||
case RPC_CALL:
|
|
||||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
|
||||||
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
|
|
||||||
break;
|
|
||||||
case ATTRIBUTES_UPDATED:
|
|
||||||
case POST_ATTRIBUTES:
|
|
||||||
case ATTRIBUTES_DELETED:
|
|
||||||
case TIMESERIES_UPDATED:
|
|
||||||
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);
|
|
||||||
}
|
|
||||||
return downlinkMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
|
private List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
|
||||||
return edgeEvents
|
List<DownlinkMsg> result = new ArrayList<>();
|
||||||
.stream()
|
for (EdgeEvent edgeEvent : edgeEvents) {
|
||||||
.map(this::convertToDownlinkMsg)
|
log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent);
|
||||||
.filter(Objects::nonNull)
|
DownlinkMsg downlinkMsg = null;
|
||||||
.collect(Collectors.toList());
|
try {
|
||||||
|
switch (edgeEvent.getAction()) {
|
||||||
|
case UPDATED:
|
||||||
|
case ADDED:
|
||||||
|
case DELETED:
|
||||||
|
case ASSIGNED_TO_EDGE:
|
||||||
|
case UNASSIGNED_FROM_EDGE:
|
||||||
|
case ALARM_ACK:
|
||||||
|
case ALARM_CLEAR:
|
||||||
|
case CREDENTIALS_UPDATED:
|
||||||
|
case RELATION_ADD_OR_UPDATE:
|
||||||
|
case RELATION_DELETED:
|
||||||
|
case CREDENTIALS_REQUEST:
|
||||||
|
case RPC_CALL:
|
||||||
|
case ASSIGNED_TO_CUSTOMER:
|
||||||
|
case UNASSIGNED_FROM_CUSTOMER:
|
||||||
|
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||||
|
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
|
||||||
|
break;
|
||||||
|
case ATTRIBUTES_UPDATED:
|
||||||
|
case POST_ATTRIBUTES:
|
||||||
|
case ATTRIBUTES_DELETED:
|
||||||
|
case TIMESERIES_UPDATED:
|
||||||
|
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);
|
||||||
|
}
|
||||||
|
if (downlinkMsg != null) {
|
||||||
|
result.add(downlinkMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Long> getQueueStartTs() {
|
private ListenableFuture<Pair<Long, Long>> getQueueStartTsAndSeqId() {
|
||||||
ListenableFuture<Optional<AttributeKvEntry>> future =
|
ListenableFuture<List<AttributeKvEntry>> future =
|
||||||
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
|
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY));
|
||||||
return Futures.transform(future, attributeKvEntryOpt -> {
|
return Futures.transform(future, attributeKvEntries -> {
|
||||||
if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
|
long startTs = 0L;
|
||||||
AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
|
long startSeqId = 0L;
|
||||||
return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
|
for (AttributeKvEntry attributeKvEntry : attributeKvEntries) {
|
||||||
} else {
|
if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) {
|
||||||
return 0L;
|
startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
|
||||||
|
}
|
||||||
|
if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) {
|
||||||
|
startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (startSeqId == 0L) {
|
||||||
|
startSeqId = findStartSeqIdFromOldestEventIfAny();
|
||||||
|
}
|
||||||
|
return Pair.of(startTs, startSeqId);
|
||||||
}, ctx.getGrpcCallbackExecutorService());
|
}, ctx.getGrpcCallbackExecutorService());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<List<String>> updateQueueStartTs(Long newStartTs) {
|
private boolean isSeqIdStartedNewCycle() {
|
||||||
log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs);
|
try {
|
||||||
List<AttributeKvEntry> attributes = Collections.singletonList(
|
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
|
||||||
new BaseAttributeKvEntry(
|
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink);
|
||||||
new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
|
return !edgeEvents.getData().isEmpty();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", edge.getTenantId(), edge.getId(), sessionId, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNewEdgeEventsAvailable() {
|
||||||
|
try {
|
||||||
|
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
|
||||||
|
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink);
|
||||||
|
return !edgeEvents.getData().isEmpty();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", edge.getTenantId(), edge.getId(), sessionId, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long findStartSeqIdFromOldestEventIfAny() {
|
||||||
|
long startSeqId = 0L;
|
||||||
|
try {
|
||||||
|
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null);
|
||||||
|
PageData<EdgeEvent> edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink);
|
||||||
|
if (!edgeEvents.getData().isEmpty()) {
|
||||||
|
startSeqId = edgeEvents.getData().get(0).getSeqId() - 1;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", edge.getTenantId(), edge.getId(), sessionId, e);
|
||||||
|
}
|
||||||
|
return startSeqId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ListenableFuture<List<String>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
|
||||||
|
this.newStartTs = pair.getFirst();
|
||||||
|
this.newStartSeqId = pair.getSecond();
|
||||||
|
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId);
|
||||||
|
List<AttributeKvEntry> attributes = Arrays.asList(
|
||||||
|
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()),
|
||||||
|
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis()));
|
||||||
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
|
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -693,8 +767,11 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void interruptPreviousSendDownlinkMsgsTask() {
|
private void interruptPreviousSendDownlinkMsgsTask() {
|
||||||
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
|
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()
|
||||||
stopCurrentSendDownlinkMsgsTask(true);
|
|| sessionState.getScheduledSendDownlinkTask() != null && !sessionState.getScheduledSendDownlinkTask().isCancelled()) {
|
||||||
|
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
|
||||||
|
stopCurrentSendDownlinkMsgsTask(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void interruptGeneralProcessingOnSync() {
|
private void interruptGeneralProcessingOnSync() {
|
||||||
|
|||||||
@ -18,7 +18,10 @@ package org.thingsboard.server.service.edge.rpc.constructor;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.server.common.data.Device;
|
||||||
|
import org.thingsboard.server.common.data.EntityView;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
|
import org.thingsboard.server.common.data.asset.Asset;
|
||||||
import org.thingsboard.server.common.data.id.AssetId;
|
import org.thingsboard.server.common.data.id.AssetId;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||||
@ -47,13 +50,22 @@ public class AlarmMsgConstructor {
|
|||||||
String entityName = null;
|
String entityName = null;
|
||||||
switch (alarm.getOriginator().getEntityType()) {
|
switch (alarm.getOriginator().getEntityType()) {
|
||||||
case DEVICE:
|
case DEVICE:
|
||||||
entityName = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId())).getName();
|
Device deviceById = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId()));
|
||||||
|
if (deviceById != null) {
|
||||||
|
entityName = deviceById.getName();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case ASSET:
|
case ASSET:
|
||||||
entityName = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId())).getName();
|
Asset assetById = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId()));
|
||||||
|
if (assetById != null) {
|
||||||
|
entityName = assetById.getName();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case ENTITY_VIEW:
|
case ENTITY_VIEW:
|
||||||
entityName = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId())).getName();
|
EntityView entityViewById = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId()));
|
||||||
|
if (entityViewById != null) {
|
||||||
|
entityName = entityViewById.getName();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
|
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
|
||||||
|
|||||||
@ -16,19 +16,27 @@
|
|||||||
package org.thingsboard.server.service.edge.rpc.fetch;
|
package org.thingsboard.server.service.edge.rpc.fetch;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
import org.thingsboard.server.common.data.page.PageLink;
|
import org.thingsboard.server.common.data.page.PageLink;
|
||||||
import org.thingsboard.server.common.data.page.SortOrder;
|
|
||||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||||
import org.thingsboard.server.dao.edge.EdgeEventService;
|
import org.thingsboard.server.dao.edge.EdgeEventService;
|
||||||
|
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@Slf4j
|
||||||
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
|
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
|
||||||
|
|
||||||
private final Long queueStartTs;
|
private final Long queueStartTs;
|
||||||
|
private Long seqIdStart;
|
||||||
|
@Getter
|
||||||
|
private Long seqIdEnd;
|
||||||
|
@Getter
|
||||||
|
private boolean seqIdNewCycleStarted;
|
||||||
|
private Long maxReadRecordsCount;
|
||||||
private final EdgeEventService edgeEventService;
|
private final EdgeEventService edgeEventService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -37,13 +45,32 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
pageSize,
|
pageSize,
|
||||||
0,
|
0,
|
||||||
null,
|
null,
|
||||||
new SortOrder("createdTime", SortOrder.Direction.ASC),
|
null,
|
||||||
queueStartTs,
|
queueStartTs,
|
||||||
null);
|
System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) {
|
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) {
|
||||||
return edgeEventService.findEdgeEvents(tenantId, edge.getId(), (TimePageLink) pageLink, true);
|
try {
|
||||||
|
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, seqIdEnd, (TimePageLink) pageLink);
|
||||||
|
if (edgeEvents.getData().isEmpty()) {
|
||||||
|
this.seqIdEnd = Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount);
|
||||||
|
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, seqIdEnd, (TimePageLink) pageLink);
|
||||||
|
if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) {
|
||||||
|
log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId());
|
||||||
|
this.seqIdNewCycleStarted = true;
|
||||||
|
this.seqIdStart = 0L;
|
||||||
|
} else {
|
||||||
|
edgeEvents = new PageData<>();
|
||||||
|
log.warn("[{}] unexpected edge notification message received. " +
|
||||||
|
"no new events found and seqId column of edge_event table doesn't started new cycle [{}]", tenantId, edge.getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return edgeEvents;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}] failed to find edge events [{}]", tenantId, edge.getId());
|
||||||
|
}
|
||||||
|
return new PageData<>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -202,22 +202,27 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
|||||||
} else {
|
} else {
|
||||||
log.info("Skipping audit logs migration");
|
log.info("Skipping audit logs migration");
|
||||||
}
|
}
|
||||||
boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
|
migrateEdgeEvents("Starting edge events migration. ");
|
||||||
if (!skipEdgeEventsMigration) {
|
|
||||||
log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
|
|
||||||
edgeEventDao.migrateEdgeEvents();
|
|
||||||
} else {
|
|
||||||
log.info("Skipping edge events migration");
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case "3.5.1":
|
case "3.5.1":
|
||||||
log.info("Updating data from version 3.5.1 to 3.5.2 ...");
|
log.info("Updating data from version 3.5.1 to 3.5.2 ...");
|
||||||
|
migrateEdgeEvents("Starting edge events migration - adding seq_id column. ");
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void migrateEdgeEvents(String logPrefix) {
|
||||||
|
boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
|
||||||
|
if (!skipEdgeEventsMigration) {
|
||||||
|
log.info(logPrefix + "Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
|
||||||
|
edgeEventDao.migrateEdgeEvents();
|
||||||
|
} else {
|
||||||
|
log.info("Skipping edge events migration");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void upgradeRuleNodes() {
|
public void upgradeRuleNodes() {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -259,7 +259,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
}
|
}
|
||||||
|
|
||||||
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
|
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
|
||||||
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
|
if (isReady) {
|
||||||
|
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
|
||||||
|
} else {
|
||||||
|
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduleLaunchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
|
||||||
|
repartitionExecutor.schedule(() -> {
|
||||||
|
if (isReady) {
|
||||||
|
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
|
||||||
|
} else {
|
||||||
|
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
|
||||||
|
}
|
||||||
|
}, 10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
|
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
|
||||||
|
|||||||
@ -68,7 +68,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
protected volatile ExecutorService consumersExecutor;
|
protected volatile ExecutorService consumersExecutor;
|
||||||
protected volatile ExecutorService notificationsConsumerExecutor;
|
protected volatile ExecutorService notificationsConsumerExecutor;
|
||||||
protected volatile boolean stopped = false;
|
protected volatile boolean stopped = false;
|
||||||
|
protected volatile boolean isReady = false;
|
||||||
protected final ActorSystemContext actorContext;
|
protected final ActorSystemContext actorContext;
|
||||||
protected final DataDecodingEncodingService encodingService;
|
protected final DataDecodingEncodingService encodingService;
|
||||||
protected final TbTenantProfileCache tenantProfileCache;
|
protected final TbTenantProfileCache tenantProfileCache;
|
||||||
@ -108,6 +108,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
public void onApplicationEvent(ApplicationReadyEvent event) {
|
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||||
log.info("Subscribing to notifications: {}", nfConsumer.getTopic());
|
log.info("Subscribing to notifications: {}", nfConsumer.getTopic());
|
||||||
this.nfConsumer.subscribe();
|
this.nfConsumer.subscribe();
|
||||||
|
this.isReady = true;
|
||||||
launchNotificationsConsumer();
|
launchNotificationsConsumer();
|
||||||
launchMainConsumers();
|
launchMainConsumers();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -100,6 +100,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
|
|||||||
|
|
||||||
@TestPropertySource(properties = {
|
@TestPropertySource(properties = {
|
||||||
"edges.enabled=true",
|
"edges.enabled=true",
|
||||||
|
"queue.rule-engine.stats.enabled=false",
|
||||||
})
|
})
|
||||||
abstract public class AbstractEdgeTest extends AbstractControllerTest {
|
abstract public class AbstractEdgeTest extends AbstractControllerTest {
|
||||||
|
|
||||||
@ -181,14 +182,14 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
|
|||||||
|
|
||||||
@After
|
@After
|
||||||
public void afterTest() throws Exception {
|
public void afterTest() throws Exception {
|
||||||
|
try {
|
||||||
|
edgeImitator.disconnect();
|
||||||
|
} catch (Exception ignored){}
|
||||||
|
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
doDelete("/api/tenant/" + savedTenant.getUuidId())
|
doDelete("/api/tenant/" + savedTenant.getUuidId())
|
||||||
.andExpect(status().isOk());
|
.andExpect(status().isOk());
|
||||||
|
|
||||||
try {
|
|
||||||
edgeImitator.disconnect();
|
|
||||||
} catch (Exception ignored) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void installation() {
|
private void installation() {
|
||||||
|
|||||||
@ -94,8 +94,6 @@ public class EdgeImitator {
|
|||||||
@Getter
|
@Getter
|
||||||
private UplinkResponseMsg latestResponseMsg;
|
private UplinkResponseMsg latestResponseMsg;
|
||||||
|
|
||||||
private boolean connected = false;
|
|
||||||
|
|
||||||
public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException {
|
public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException {
|
||||||
edgeRpcClient = new EdgeGrpcClient();
|
edgeRpcClient = new EdgeGrpcClient();
|
||||||
messagesLatch = new CountDownLatch(0);
|
messagesLatch = new CountDownLatch(0);
|
||||||
@ -120,7 +118,6 @@ public class EdgeImitator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void connect() {
|
public void connect() {
|
||||||
connected = true;
|
|
||||||
edgeRpcClient.connect(routingKey, routingSecret,
|
edgeRpcClient.connect(routingKey, routingSecret,
|
||||||
this::onUplinkResponse,
|
this::onUplinkResponse,
|
||||||
this::onEdgeUpdate,
|
this::onEdgeUpdate,
|
||||||
@ -131,7 +128,6 @@ public class EdgeImitator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void disconnect() throws InterruptedException {
|
public void disconnect() throws InterruptedException {
|
||||||
connected = false;
|
|
||||||
edgeRpcClient.disconnect(false);
|
edgeRpcClient.disconnect(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -14,6 +14,7 @@ edges.enabled=false
|
|||||||
edges.storage.no_read_records_sleep=500
|
edges.storage.no_read_records_sleep=500
|
||||||
edges.storage.sleep_between_batches=500
|
edges.storage.sleep_between_batches=500
|
||||||
actors.rpc.sequential=true
|
actors.rpc.sequential=true
|
||||||
|
queue.rule-engine.stats.enabled=true
|
||||||
|
|
||||||
# Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests
|
# Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests
|
||||||
transport.http.enabled=false
|
transport.http.enabled=false
|
||||||
|
|||||||
@ -26,7 +26,7 @@ public interface EdgeEventService {
|
|||||||
|
|
||||||
ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent);
|
ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent);
|
||||||
|
|
||||||
PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
|
PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes stored procedure to cleanup old edge events.
|
* Executes stored procedure to cleanup old edge events.
|
||||||
|
|||||||
@ -31,6 +31,7 @@ import java.util.UUID;
|
|||||||
@ToString(callSuper = true)
|
@ToString(callSuper = true)
|
||||||
public class EdgeEvent extends BaseData<EdgeEventId> {
|
public class EdgeEvent extends BaseData<EdgeEventId> {
|
||||||
|
|
||||||
|
private long seqId;
|
||||||
private TenantId tenantId;
|
private TenantId tenantId;
|
||||||
private EdgeId edgeId;
|
private EdgeId edgeId;
|
||||||
private EdgeEventActionType action;
|
private EdgeEventActionType action;
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.msg;
|
package org.thingsboard.server.common.msg;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
|
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
|
||||||
|
|
||||||
@ -28,7 +29,7 @@ public enum MsgType {
|
|||||||
*
|
*
|
||||||
* See {@link PartitionChangeMsg}
|
* See {@link PartitionChangeMsg}
|
||||||
*/
|
*/
|
||||||
PARTITION_CHANGE_MSG,
|
PARTITION_CHANGE_MSG(true),
|
||||||
|
|
||||||
APP_INIT_MSG,
|
APP_INIT_MSG,
|
||||||
|
|
||||||
@ -108,7 +109,7 @@ public enum MsgType {
|
|||||||
* Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
|
* Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
|
||||||
*/
|
*/
|
||||||
|
|
||||||
SESSION_TIMEOUT_MSG,
|
SESSION_TIMEOUT_MSG(true),
|
||||||
|
|
||||||
STATS_PERSIST_TICK_MSG,
|
STATS_PERSIST_TICK_MSG,
|
||||||
|
|
||||||
@ -130,4 +131,14 @@ public enum MsgType {
|
|||||||
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
|
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
|
||||||
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
|
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final boolean ignoreOnStart;
|
||||||
|
|
||||||
|
MsgType() {
|
||||||
|
this.ignoreOnStart = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
MsgType(boolean ignoreOnStart) {
|
||||||
|
this.ignoreOnStart = ignoreOnStart;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -42,8 +42,8 @@ public class BaseEdgeEventService implements EdgeEventService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
|
public PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) {
|
||||||
return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, pageLink, withTsUpdate);
|
return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, seqIdStart, seqIdEnd, pageLink);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -43,10 +43,12 @@ public interface EdgeEventDao extends Dao<EdgeEvent> {
|
|||||||
*
|
*
|
||||||
* @param tenantId the tenantId
|
* @param tenantId the tenantId
|
||||||
* @param edgeId the edgeId
|
* @param edgeId the edgeId
|
||||||
|
* @param seqIdStart the seq id start
|
||||||
|
* @param seqIdEnd the seq id end
|
||||||
* @param pageLink the pageLink
|
* @param pageLink the pageLink
|
||||||
* @return the event list
|
* @return the event list
|
||||||
*/
|
*/
|
||||||
PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
|
PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes stored procedure to cleanup old edge events.
|
* Executes stored procedure to cleanup old edge events.
|
||||||
|
|||||||
@ -535,6 +535,7 @@ public class ModelConstants {
|
|||||||
*/
|
*/
|
||||||
public static final String EDGE_EVENT_TABLE_NAME = "edge_event";
|
public static final String EDGE_EVENT_TABLE_NAME = "edge_event";
|
||||||
public static final String EDGE_EVENT_TENANT_ID_PROPERTY = TENANT_ID_PROPERTY;
|
public static final String EDGE_EVENT_TENANT_ID_PROPERTY = TENANT_ID_PROPERTY;
|
||||||
|
public static final String EDGE_EVENT_SEQUENTIAL_ID_PROPERTY = "seq_id";
|
||||||
public static final String EDGE_EVENT_EDGE_ID_PROPERTY = "edge_id";
|
public static final String EDGE_EVENT_EDGE_ID_PROPERTY = "edge_id";
|
||||||
public static final String EDGE_EVENT_TYPE_PROPERTY = "edge_event_type";
|
public static final String EDGE_EVENT_TYPE_PROPERTY = "edge_event_type";
|
||||||
public static final String EDGE_EVENT_ACTION_PROPERTY = "edge_event_action";
|
public static final String EDGE_EVENT_ACTION_PROPERTY = "edge_event_action";
|
||||||
|
|||||||
@ -43,6 +43,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_BODY_PR
|
|||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_NAME;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_NAME;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_EDGE_ID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_EDGE_ID_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_ID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_ID_PROPERTY;
|
||||||
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_SEQUENTIAL_ID_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TENANT_ID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TENANT_ID_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TYPE_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TYPE_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_UID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_UID_PROPERTY;
|
||||||
@ -57,6 +58,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class EdgeEventEntity extends BaseSqlEntity<EdgeEvent> implements BaseEntity<EdgeEvent> {
|
public class EdgeEventEntity extends BaseSqlEntity<EdgeEvent> implements BaseEntity<EdgeEvent> {
|
||||||
|
|
||||||
|
@Column(name = EDGE_EVENT_SEQUENTIAL_ID_PROPERTY)
|
||||||
|
protected long seqId;
|
||||||
|
|
||||||
@Column(name = EDGE_EVENT_TENANT_ID_PROPERTY)
|
@Column(name = EDGE_EVENT_TENANT_ID_PROPERTY)
|
||||||
private UUID tenantId;
|
private UUID tenantId;
|
||||||
|
|
||||||
@ -120,6 +124,7 @@ public class EdgeEventEntity extends BaseSqlEntity<EdgeEvent> implements BaseEnt
|
|||||||
edgeEvent.setAction(edgeEventAction);
|
edgeEvent.setAction(edgeEventAction);
|
||||||
edgeEvent.setBody(entityBody);
|
edgeEvent.setBody(entityBody);
|
||||||
edgeEvent.setUid(edgeEventUid);
|
edgeEvent.setUid(edgeEventUid);
|
||||||
|
edgeEvent.setSeqId(seqId);
|
||||||
return edgeEvent;
|
return edgeEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -30,8 +30,10 @@ public interface EdgeEventRepository extends JpaRepository<EdgeEventEntity, UUID
|
|||||||
@Query("SELECT e FROM EdgeEventEntity e WHERE " +
|
@Query("SELECT e FROM EdgeEventEntity e WHERE " +
|
||||||
"e.tenantId = :tenantId " +
|
"e.tenantId = :tenantId " +
|
||||||
"AND e.edgeId = :edgeId " +
|
"AND e.edgeId = :edgeId " +
|
||||||
"AND (:startTime IS NULL OR e.createdTime > :startTime) " +
|
"AND (:startTime IS NULL OR e.createdTime >= :startTime) " +
|
||||||
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
|
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
|
||||||
|
"AND (:seqIdStart IS NULL OR e.seqId > :seqIdStart) " +
|
||||||
|
"AND (:seqIdEnd IS NULL OR e.seqId < :seqIdEnd) " +
|
||||||
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
|
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
|
||||||
)
|
)
|
||||||
Page<EdgeEventEntity> findEdgeEventsByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId,
|
Page<EdgeEventEntity> findEdgeEventsByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId,
|
||||||
@ -39,20 +41,7 @@ public interface EdgeEventRepository extends JpaRepository<EdgeEventEntity, UUID
|
|||||||
@Param("textSearch") String textSearch,
|
@Param("textSearch") String textSearch,
|
||||||
@Param("startTime") Long startTime,
|
@Param("startTime") Long startTime,
|
||||||
@Param("endTime") Long endTime,
|
@Param("endTime") Long endTime,
|
||||||
|
@Param("seqIdStart") Long seqIdStart,
|
||||||
|
@Param("seqIdEnd") Long seqIdEnd,
|
||||||
Pageable pageable);
|
Pageable pageable);
|
||||||
|
|
||||||
@Query("SELECT e FROM EdgeEventEntity e WHERE " +
|
|
||||||
"e.tenantId = :tenantId " +
|
|
||||||
"AND e.edgeId = :edgeId " +
|
|
||||||
"AND (:startTime IS NULL OR e.createdTime > :startTime) " +
|
|
||||||
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
|
|
||||||
"AND e.edgeEventAction <> 'TIMESERIES_UPDATED' " +
|
|
||||||
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
|
|
||||||
)
|
|
||||||
Page<EdgeEventEntity> findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(@Param("tenantId") UUID tenantId,
|
|
||||||
@Param("edgeId") UUID edgeId,
|
|
||||||
@Param("textSearch") String textSearch,
|
|
||||||
@Param("startTime") Long startTime,
|
|
||||||
@Param("endTime") Long endTime,
|
|
||||||
Pageable pageable);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.edge.EdgeEvent;
|
|||||||
import org.thingsboard.server.common.data.id.EdgeEventId;
|
import org.thingsboard.server.common.data.id.EdgeEventId;
|
||||||
import org.thingsboard.server.common.data.id.EdgeId;
|
import org.thingsboard.server.common.data.id.EdgeId;
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
|
import org.thingsboard.server.common.data.page.SortOrder;
|
||||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||||
import org.thingsboard.server.common.stats.StatsFactory;
|
import org.thingsboard.server.common.stats.StatsFactory;
|
||||||
import org.thingsboard.server.dao.DaoUtil;
|
import org.thingsboard.server.dao.DaoUtil;
|
||||||
@ -43,7 +44,9 @@ import org.thingsboard.server.dao.util.SqlDao;
|
|||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -118,7 +121,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, 1, statsFactory);
|
queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, 1, statsFactory);
|
||||||
queue.init(logExecutor, v -> edgeEventInsertRepository.save(v),
|
queue.init(logExecutor, edgeEventInsertRepository::save,
|
||||||
Comparator.comparing(EdgeEventEntity::getTs)
|
Comparator.comparing(EdgeEventEntity::getTs)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -171,29 +174,23 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
|
|||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
|
public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) {
|
||||||
if (withTsUpdate) {
|
List<SortOrder> sortOrders = new ArrayList<>();
|
||||||
return DaoUtil.toPageData(
|
if (pageLink.getSortOrder() != null) {
|
||||||
edgeEventRepository
|
sortOrders.add(pageLink.getSortOrder());
|
||||||
.findEdgeEventsByTenantIdAndEdgeId(
|
|
||||||
tenantId,
|
|
||||||
edgeId.getId(),
|
|
||||||
Objects.toString(pageLink.getTextSearch(), ""),
|
|
||||||
pageLink.getStartTime(),
|
|
||||||
pageLink.getEndTime(),
|
|
||||||
DaoUtil.toPageable(pageLink)));
|
|
||||||
} else {
|
|
||||||
return DaoUtil.toPageData(
|
|
||||||
edgeEventRepository
|
|
||||||
.findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
|
|
||||||
tenantId,
|
|
||||||
edgeId.getId(),
|
|
||||||
Objects.toString(pageLink.getTextSearch(), ""),
|
|
||||||
pageLink.getStartTime(),
|
|
||||||
pageLink.getEndTime(),
|
|
||||||
DaoUtil.toPageable(pageLink)));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
sortOrders.add(new SortOrder("seqId"));
|
||||||
|
return DaoUtil.toPageData(
|
||||||
|
edgeEventRepository
|
||||||
|
.findEdgeEventsByTenantIdAndEdgeId(
|
||||||
|
tenantId,
|
||||||
|
edgeId.getId(),
|
||||||
|
Objects.toString(pageLink.getTextSearch(), ""),
|
||||||
|
pageLink.getStartTime(),
|
||||||
|
pageLink.getEndTime(),
|
||||||
|
seqIdStart,
|
||||||
|
seqIdEnd,
|
||||||
|
DaoUtil.toPageable(pageLink, sortOrders)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -720,6 +720,7 @@ CREATE TABLE IF NOT EXISTS edge (
|
|||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS edge_event (
|
CREATE TABLE IF NOT EXISTS edge_event (
|
||||||
|
seq_id INT GENERATED ALWAYS AS IDENTITY,
|
||||||
id uuid NOT NULL,
|
id uuid NOT NULL,
|
||||||
created_time bigint NOT NULL,
|
created_time bigint NOT NULL,
|
||||||
edge_id uuid,
|
edge_id uuid,
|
||||||
@ -731,6 +732,7 @@ CREATE TABLE IF NOT EXISTS edge_event (
|
|||||||
tenant_id uuid,
|
tenant_id uuid,
|
||||||
ts bigint NOT NULL
|
ts bigint NOT NULL
|
||||||
) PARTITION BY RANGE(created_time);
|
) PARTITION BY RANGE(created_time);
|
||||||
|
ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS rpc (
|
CREATE TABLE IF NOT EXISTS rpc (
|
||||||
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
||||||
|
|||||||
@ -71,7 +71,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
|
|||||||
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.ADDED);
|
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.ADDED);
|
||||||
edgeEventService.saveAsync(edgeEvent).get();
|
edgeEventService.saveAsync(edgeEvent).get();
|
||||||
|
|
||||||
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, new TimePageLink(1), false);
|
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, new TimePageLink(1));
|
||||||
Assert.assertFalse(edgeEvents.getData().isEmpty());
|
Assert.assertFalse(edgeEvents.getData().isEmpty());
|
||||||
|
|
||||||
EdgeEvent saved = edgeEvents.getData().get(0);
|
EdgeEvent saved = edgeEvents.getData().get(0);
|
||||||
@ -113,7 +113,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
|
|||||||
Futures.allAsList(futures).get();
|
Futures.allAsList(futures).get();
|
||||||
|
|
||||||
TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime);
|
TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime);
|
||||||
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
|
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink);
|
||||||
|
|
||||||
Assert.assertNotNull(edgeEvents.getData());
|
Assert.assertNotNull(edgeEvents.getData());
|
||||||
Assert.assertEquals(2, edgeEvents.getData().size());
|
Assert.assertEquals(2, edgeEvents.getData().size());
|
||||||
@ -122,7 +122,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
|
|||||||
Assert.assertTrue(edgeEvents.hasNext());
|
Assert.assertTrue(edgeEvents.hasNext());
|
||||||
Assert.assertNotNull(pageLink.nextPageLink());
|
Assert.assertNotNull(pageLink.nextPageLink());
|
||||||
|
|
||||||
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink.nextPageLink(), true);
|
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink.nextPageLink());
|
||||||
|
|
||||||
Assert.assertNotNull(edgeEvents.getData());
|
Assert.assertNotNull(edgeEvents.getData());
|
||||||
Assert.assertEquals(1, edgeEvents.getData().size());
|
Assert.assertEquals(1, edgeEvents.getData().size());
|
||||||
@ -132,26 +132,6 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
|
|||||||
edgeEventService.cleanupEvents(1);
|
edgeEventService.cleanupEvents(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void findEdgeEventsWithTsUpdateAndWithout() throws Exception {
|
|
||||||
EdgeId edgeId = new EdgeId(Uuids.timeBased());
|
|
||||||
DeviceId deviceId = new DeviceId(Uuids.timeBased());
|
|
||||||
TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
|
|
||||||
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC));
|
|
||||||
|
|
||||||
EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
|
|
||||||
edgeEventService.saveAsync(edgeEventWithTsUpdate).get();
|
|
||||||
|
|
||||||
PageData<EdgeEvent> allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
|
|
||||||
PageData<EdgeEvent> edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false);
|
|
||||||
|
|
||||||
Assert.assertNotNull(allEdgeEvents.getData());
|
|
||||||
Assert.assertNotNull(edgeEventsWithoutTsUpdate.getData());
|
|
||||||
Assert.assertEquals(1, allEdgeEvents.getData().size());
|
|
||||||
Assert.assertEquals(allEdgeEvents.getData().get(0).getUuidId(), edgeEventWithTsUpdate.getUuidId());
|
|
||||||
Assert.assertTrue(edgeEventsWithoutTsUpdate.getData().isEmpty());
|
|
||||||
}
|
|
||||||
|
|
||||||
private ListenableFuture<Void> saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
|
private ListenableFuture<Void> saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
|
||||||
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
|
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
|
||||||
edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
|
edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
|
||||||
|
|||||||
@ -1,2 +1,5 @@
|
|||||||
--PostgreSQL specific truncate to fit constraints
|
--PostgreSQL specific truncate to fit constraints
|
||||||
TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
|
TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
|
||||||
|
|
||||||
|
-- Decrease seq_id column to make sure to cover cases of new sequential cycle during the tests
|
||||||
|
ALTER SEQUENCE edge_event_seq_id_seq MAXVALUE 256;
|
||||||
|
|||||||
@ -76,6 +76,10 @@ public class TbDeviceProfileNode implements TbNode {
|
|||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
scheduleAlarmHarvesting(ctx, null);
|
scheduleAlarmHarvesting(ctx, null);
|
||||||
ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate);
|
ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate);
|
||||||
|
initAlarmRuleState(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initAlarmRuleState(boolean printNewlyAddedDeviceStates) {
|
||||||
if (config.isFetchAlarmRulesStateOnStart()) {
|
if (config.isFetchAlarmRulesStateOnStart()) {
|
||||||
log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
|
log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
|
||||||
int fetchCount = 0;
|
int fetchCount = 0;
|
||||||
@ -86,7 +90,7 @@ public class TbDeviceProfileNode implements TbNode {
|
|||||||
for (RuleNodeState rns : states.getData()) {
|
for (RuleNodeState rns : states.getData()) {
|
||||||
fetchCount++;
|
fetchCount++;
|
||||||
if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) {
|
if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) {
|
||||||
getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns);
|
getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns, printNewlyAddedDeviceStates);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -130,7 +134,7 @@ public class TbDeviceProfileNode implements TbNode {
|
|||||||
removeDeviceState(deviceId);
|
removeDeviceState(deviceId);
|
||||||
ctx.tellSuccess(msg);
|
ctx.tellSuccess(msg);
|
||||||
} else {
|
} else {
|
||||||
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null);
|
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null, false);
|
||||||
if (deviceState != null) {
|
if (deviceState != null) {
|
||||||
deviceState.process(ctx, msg);
|
deviceState.process(ctx, msg);
|
||||||
} else {
|
} else {
|
||||||
@ -148,6 +152,7 @@ public class TbDeviceProfileNode implements TbNode {
|
|||||||
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
|
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
|
||||||
// Cleanup the cache for all entities that are no longer assigned to current server partitions
|
// Cleanup the cache for all entities that are no longer assigned to current server partitions
|
||||||
deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
|
deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
|
||||||
|
initAlarmRuleState(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode {
|
|||||||
deviceStates.clear();
|
deviceStates.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) {
|
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) {
|
||||||
DeviceState deviceState = deviceStates.get(deviceId);
|
DeviceState deviceState = deviceStates.get(deviceId);
|
||||||
if (deviceState == null) {
|
if (deviceState == null) {
|
||||||
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
|
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
|
||||||
if (deviceProfile != null) {
|
if (deviceProfile != null) {
|
||||||
deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns);
|
deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns);
|
||||||
deviceStates.put(deviceId, deviceState);
|
deviceStates.put(deviceId, deviceState);
|
||||||
|
if (printNewlyAddedDeviceStates) {
|
||||||
|
log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return deviceState;
|
return deviceState;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user