From 94039f943b46caa0dc86ab722fbf0006b9c4acab Mon Sep 17 00:00:00 2001 From: vparomskiy Date: Fri, 23 Mar 2018 16:56:37 +0200 Subject: [PATCH] Squashed commit of the following: commit 8b637c9e9456963e1e24ec0e6306903d2e765c1f Author: vparomskiy Date: Fri Mar 23 16:54:48 2018 +0200 add cassandra properties in test suite --- .../server/actors/plugin/PluginActorMessageProcessor.java | 1 + application/src/main/resources/thingsboard.yml | 4 ++-- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 3 --- .../thingsboard/server/dao/util/BufferedRateLimiter.java | 8 ++++++-- dao/src/test/resources/cassandra-test.properties | 5 +++++ 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java index 51ca652d6f..f6bf54d349 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java @@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor try { pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); } catch (Exception ex) { + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex); RuleToPluginMsg ruleMsg = msg.getMsg(); MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; Integer requestId = 0; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ff5824dbfd..fc04a193e1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -133,7 +133,7 @@ quota: intervalMin: 2 database: - type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql + type: "${DATABASE_TYPE:sql}" # cassandra OR sql # Cassandra driver configuration parameters cassandra: @@ -238,7 +238,7 @@ caffeine: specs: relations: timeToLiveInMinutes: 1440 - maxSize: 0 + maxSize: 100000 deviceCredentials: timeToLiveInMinutes: 1440 maxSize: 100000 diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index cf141711a5..cda4b1669b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getLatestStmt() { if (latestInsertStmt == null) { -// latestInsertStmt = new PreparedStatement[DataType.values().length]; -// for (DataType type : DataType.values()) { latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + @@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem "," + ModelConstants.LONG_VALUE_COLUMN + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); -// } } return latestInsertStmt; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java index de07dbfa47..2acd623a37 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j +@NoSqlDao public class BufferedRateLimiter implements AsyncRateLimiter { private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); @@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { lockedFuture.cancelFuture(); return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); } + if(permits.get() < permitsLimit) { + reprocessQueue(); + } return lockedFuture.future; } catch (InterruptedException e) { return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); @@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter { expiredCount++; } } - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0), - maxGrantedPermissions.getAndSet(0), expiredCount); + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); } private class LockedFuture { diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 82fcbe1949..737687f053 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 +cassandra.query.buffer_size=100000 +cassandra.query.concurrent_limit=1000 +cassandra.query.permit_max_wait_time=20000 +cassandra.query.rate_limit_print_interval_ms=30000 +