Edge queue configuration moved to config file

This commit is contained in:
Volodymyr Babak 2020-03-31 09:58:07 +03:00
parent bb95ce99f8
commit c3aee9c91c
6 changed files with 42 additions and 13 deletions

View File

@ -6,9 +6,7 @@
"firstRuleNodeId": null,
"root": true,
"debugMode": false,
"configuration": null,
"assignedEdges": [],
"assignedEdgesIds": []
"configuration": null
},
"metadata": {
"firstNodeIndex": 2,

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
@ -108,4 +109,8 @@ public class EdgeContextComponent {
@Lazy
@Autowired
private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
@Lazy
@Autowired
private EdgeEventStorageSettings edgeEventStorageSettings;
}

View File

@ -0,0 +1,17 @@
package org.thingsboard.server.service.edge.rpc;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Data
public class EdgeEventStorageSettings {
@Value("${edges.rpc.storage.max_read_records_count}")
private int maxReadRecordsCount;
@Value("${edges.rpc.storage.no_read_records_sleep}")
private long noRecordsSleepInterval;
@Value("${edges.rpc.storage.sleep_between_batches}")
private long sleepIntervalBetweenBatches;
}

View File

@ -56,7 +56,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private boolean sslEnabled;
@Value("${edges.rpc.ssl.cert}")
private String certFileResource;
@Value("${edges.rpc.ssl.privateKey}")
@Value("${edges.rpc.ssl.private_key}")
private String privateKeyResource;
@Autowired

View File

@ -101,6 +101,8 @@ public final class EdgeGrpcSession implements Cloneable {
private static final ReentrantLock entityCreationLock = new ReentrantLock();
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private final UUID sessionId;
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
private final Consumer<EdgeId> sessionCloseListener;
@ -163,8 +165,7 @@ public final class EdgeGrpcSession implements Cloneable {
void processHandleMessages() throws ExecutionException, InterruptedException {
Long queueStartTs = getQueueStartTs().get();
// TODO: this 100 value must be changed properly
TimePageLink pageLink = new TimePageLink(30, queueStartTs + 1000, null, true);
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true);
TimePageData<Event> pageData;
UUID ifOffset = null;
do {
@ -173,10 +174,8 @@ public final class EdgeGrpcSession implements Cloneable {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
for (Event event : pageData.getData()) {
log.trace("[{}] Processing event [{}]", this.sessionId, event);
EdgeQueueEntry entry;
try {
entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
EdgeQueueEntry entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
UpdateMsgType msgType = getResponseMsgType(entry.getType());
switch (msgType) {
case ENTITY_DELETED_RPC_MESSAGE:
@ -201,6 +200,11 @@ public final class EdgeGrpcSession implements Cloneable {
}
if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink();
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("Error during sleep between batches", e);
}
}
} while (pageData.hasNext());
@ -209,7 +213,7 @@ public final class EdgeGrpcSession implements Cloneable {
updateQueueStartTs(newStartTs);
}
try {
Thread.sleep(1000);
Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
} catch (InterruptedException e) {
log.error("Error during sleep", e);
}
@ -339,13 +343,14 @@ public final class EdgeGrpcSession implements Cloneable {
}
private void updateQueueStartTs(Long newStartTs) {
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry("queueStartTs", newStartTs), System.currentTimeMillis()));
newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
}
private ListenableFuture<Long> getQueueStartTs() {
ListenableFuture<Optional<AttributeKvEntry>> future =
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
return Futures.transform(future, attributeKvEntryOpt -> {
if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();

View File

@ -571,7 +571,11 @@ edges:
# Enable/disable SSL support
enabled: "${EDGES_RPC_SSL_ENABLED:false}"
cert: "${EDGES_RPC_SSL_CERT:certChainFile.pem}"
privateKey: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
storage:
max_read_records_count: "${EDGES_RPC_STORAGE_MAX_READ_RECORDS_COUNT:50}"
no_read_records_sleep: "${EDGES_RPC_NO_READ_RECORDS_SLEEP:1000}"
sleep_between_batches: "${EDGES_RPC_SLEEP_BETWEEN_BATCHES:1000}"
swagger:
api_path_regex: "${SWAGGER_API_PATH_REGEX:/api.*}"