From 94db399367c62d3a3885c00c0564e45b4055f424 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Wed, 23 May 2018 17:06:51 +0300 Subject: [PATCH] New system parameters: default cassandra ts key/val ttl; allow system mail service for rules. --- .../server/actors/ActorSystemContext.java | 4 ++++ .../actors/ruleChain/DefaultTbContext.java | 6 +++++- .../service/queue/DefaultMsgQueueService.java | 4 ++-- .../src/main/resources/thingsboard.yml | 20 +++++++++---------- .../dao/queue/db/nosql/CassandraMsgQueue.java | 2 +- .../dao/queue/memory/InMemoryMsgQueue.java | 2 +- .../CassandraBaseTimeseriesDao.java | 16 +++++++++++++++ .../test/resources/cassandra-test.properties | 2 ++ 8 files changed, 41 insertions(+), 15 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index f7c7f1a5a8..4840f2a0e8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -235,6 +235,10 @@ public class ActorSystemContext { @Getter private boolean tenantComponentsInitEnabled; + @Value("${actors.rule.allow_system_mail_service}") + @Getter + private boolean allowSystemMailService; + @Getter @Setter private ActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index b888bc33e4..70509fb875 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext { @Override public MailService getMailService() { - return mainCtx.getMailService(); + if (mainCtx.isAllowSystemMailService()) { + return mainCtx.getMailService(); + } else { + throw new RuntimeException("Access to System Mail Service is forbidden!"); + } } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java index a4558eb0a4..927584789d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java @@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class DefaultMsgQueueService implements MsgQueueService { - @Value("${rule.queue.max_size}") + @Value("${actors.rule.queue.max_size}") private long queueMaxSize; - @Value("${rule.queue.cleanup_period}") + @Value("${actors.rule.queue.cleanup_period}") private long queueCleanUpPeriod; @Autowired diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9a1089549e..a10ef7d7c0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -203,6 +203,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" @@ -236,6 +237,8 @@ actors: js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}" # Specify thread pool size for mail sender executor service mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}" + # Whether to allow usage of system mail service for rules + allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}" # Specify thread pool size for external call service external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}" js_sandbox: @@ -253,6 +256,13 @@ actors: node: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" + queue: + # Message queue type (memory or db) + type: "${ACTORS_RULE_QUEUE_TYPE:memory}" + # Message queue maximum size (per tenant) + max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}" + # Message queue cleanup period in seconds + cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" @@ -333,16 +343,6 @@ spring: username: "${SPRING_DATASOURCE_USERNAME:sa}" password: "${SPRING_DATASOURCE_PASSWORD:}" -rule: - queue: - #Message queue type (memory or db) - type: "${RULE_QUEUE_TYPE:memory}" - #Message queue maximum size (per tenant) - max_size: "${RULE_QUEUE_MAX_SIZE:100}" - #Message queue cleanup period in seconds - cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}" - - # PostgreSQL DAO Configuration #spring: # data: diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java index ce481b78b8..9cc87b47b6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java @@ -36,7 +36,7 @@ import java.util.List; import java.util.UUID; @Component -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "db") +@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "db") @Slf4j @NoSqlDao public class CassandraMsgQueue implements MsgQueue { diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java index 4532e023ce..93057784f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java @@ -40,7 +40,7 @@ import java.util.concurrent.Executors; * Created by ashvayka on 27.04.18. */ @Component -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) +@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) @Slf4j public class InMemoryMsgQueue implements MsgQueue { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 0fa9653ad2..7aa317cbf9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; + @Value("${cassandra.query.ts_key_value_ttl}") + private long systemTtl; + private TsPartitionDate tsFormat; private PreparedStatement partitionInsertStmt; @@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) { + ttl = computeTtl(ttl); long partition = toPartitionTs(tsKvEntry.getTs()); DataType type = tsKvEntry.getDataType(); BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); @@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { + ttl = computeTtl(ttl); long partition = toPartitionTs(tsKvEntryTs); log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); @@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return getFuture(executeAsyncWrite(stmt), rs -> null); } + private long computeTtl(long ttl) { + if (systemTtl > 0) { + if (ttl == 0) { + ttl = systemTtl; + } else { + ttl = Math.min(systemTtl, ttl); + } + } + return ttl; + } + @Override public ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { BoundStatement stmt = getLatestStmt().bind() diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 737687f053..cf07b22e42 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS +cassandra.query.ts_key_value_ttl=0 + cassandra.query.max_limit_per_request=1000 cassandra.query.buffer_size=100000 cassandra.query.concurrent_limit=1000