diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java index 51ca652d6f..f6bf54d349 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java @@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor try { pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); } catch (Exception ex) { + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex); RuleToPluginMsg ruleMsg = msg.getMsg(); MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; Integer requestId = 0; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c08de4f1b8..fc04a193e1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -238,7 +238,7 @@ caffeine: specs: relations: timeToLiveInMinutes: 1440 - maxSize: 0 + maxSize: 100000 deviceCredentials: timeToLiveInMinutes: 1440 maxSize: 100000 diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index cf141711a5..cda4b1669b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getLatestStmt() { if (latestInsertStmt == null) { -// latestInsertStmt = new PreparedStatement[DataType.values().length]; -// for (DataType type : DataType.values()) { latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + @@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem "," + ModelConstants.LONG_VALUE_COLUMN + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); -// } } return latestInsertStmt; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java index de07dbfa47..2acd623a37 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j +@NoSqlDao public class BufferedRateLimiter implements AsyncRateLimiter { private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); @@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { lockedFuture.cancelFuture(); return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); } + if(permits.get() < permitsLimit) { + reprocessQueue(); + } return lockedFuture.future; } catch (InterruptedException e) { return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); @@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter { expiredCount++; } } - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0), - maxGrantedPermissions.getAndSet(0), expiredCount); + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); } private class LockedFuture { diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 82fcbe1949..737687f053 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 +cassandra.query.buffer_size=100000 +cassandra.query.concurrent_limit=1000 +cassandra.query.permit_max_wait_time=20000 +cassandra.query.rate_limit_print_interval_ms=30000 +