diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index c3444d43f7..751bde6303 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.Arrays; @@ -26,6 +27,7 @@ import java.util.Arrays; @SpringBootConfiguration @EnableAsync @EnableSwagger2 +@EnableScheduling @ComponentScan({"org.thingsboard.server"}) public class ThingsboardServerApplication { diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java index 6e78e20521..7dfe9a5d81 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.PluginMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.common.msg.session.MsgType; import org.thingsboard.server.extensions.api.plugins.Plugin; import org.thingsboard.server.extensions.api.plugins.PluginInitializationException; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg; import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; @@ -98,7 +103,19 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException { if (state == ComponentLifecycleState.ACTIVE) { - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + try { + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + } catch (Exception ex) { + RuleToPluginMsg ruleMsg = msg.getMsg(); + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; + Integer requestId = 0; + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) { + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId(); + } + trustedCtx.reply( + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(), + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex))); + } } else { //TODO: reply with plugin suspended message } diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java index 2ebebfca33..fd84fe978f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java @@ -15,8 +15,9 @@ */ package org.thingsboard.server.actors.rule; -import java.util.*; - +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.event.LoggingAdapter; import com.fasterxml.jackson.core.JsonProcessingException; import org.springframework.util.StringUtils; import org.thingsboard.server.actors.ActorSystemContext; @@ -29,23 +30,17 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.data.plugin.PluginMetaData; import org.thingsboard.server.common.data.rule.RuleMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.core.BasicRequest; import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; import org.thingsboard.server.common.msg.core.RuleEngineError; import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; -import org.thingsboard.server.common.msg.session.MsgType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; -import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException; -import org.thingsboard.server.extensions.api.rules.*; import org.thingsboard.server.extensions.api.plugins.PluginAction; import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.*; -import com.fasterxml.jackson.databind.JsonNode; - -import akka.actor.ActorContext; -import akka.actor.ActorRef; -import akka.event.LoggingAdapter; +import java.util.*; class RuleActorMessageProcessor extends ComponentMsgProcessor { @@ -190,18 +185,32 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); if (pendingMsg != null) { ChainProcessingContext ctx = pendingMsg.getCtx(); - Optional ruleResponseOptional = action.convert(msg); - if (ruleResponseOptional.isPresent()) { - ctx.mergeResponse(ruleResponseOptional.get()); - pushToNextRule(context, ctx, null); - } else { + if (isErrorResponce(msg)) { pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS); + } else { + Optional ruleResponseOptional = action.convert(msg); + if (ruleResponseOptional.isPresent()) { + ctx.mergeResponse(ruleResponseOptional.get()); + pushToNextRule(context, ctx, null); + } else { + pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS); + } } } else { logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid()); } } + private boolean isErrorResponce(PluginToRuleMsg msg) { + if (msg instanceof ResponsePluginToRuleMsg) { + if (((ResponsePluginToRuleMsg) msg).getPayload() instanceof BasicStatusCodeResponse) { + BasicStatusCodeResponse responce = (BasicStatusCodeResponse) ((ResponsePluginToRuleMsg) msg).getPayload(); + return !responce.isSuccess(); + } + } + return false; + } + void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); if (pendingMsg != null) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 77f06e92d3..07face1059 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -133,7 +133,7 @@ quota: intervalMin: 2 database: - type: "${DATABASE_TYPE:sql}" # cassandra OR sql + type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql # Cassandra driver configuration parameters cassandra: @@ -181,6 +181,10 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}" # SQL configuration parameters sql: @@ -222,7 +226,7 @@ caffeine: specs: relations: timeToLiveInMinutes: 1440 - maxSize: 100000 + maxSize: 0 deviceCredentials: timeToLiveInMinutes: 1440 maxSize: 100000 diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java index 4f923fe808..64ec718fdd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java index 932d6b9147..8ae9dc8ad4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, key)); log.debug("Remove request: {}", delete.toString()); - return getFuture(getSession().executeAsync(delete), rs -> null); + return getFuture(executeAsyncWrite(delete), rs -> null); } private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + "(" + ENTITY_TYPE_COLUMN + "," + ENTITY_ID_COLUMN + "," + ATTRIBUTE_TYPE_COLUMN + diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java index 27f7adc669..fd02b5f880 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java index 94299caeb4..5c93066d90 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.model.type.*; +import org.thingsboard.server.dao.util.BufferedRateLimiter; @Slf4j public abstract class CassandraAbstractDao { @@ -28,12 +29,15 @@ public abstract class CassandraAbstractDao { @Autowired protected CassandraCluster cluster; + @Autowired + private BufferedRateLimiter rateLimiter; + private Session session; private ConsistencyLevel defaultReadLevel; private ConsistencyLevel defaultWriteLevel; - protected Session getSession() { + private Session getSession() { if (session == null) { session = cluster.getSession(); defaultReadLevel = cluster.getDefaultReadConsistencyLevel(); @@ -50,6 +54,10 @@ public abstract class CassandraAbstractDao { return session; } + protected PreparedStatement prepare(String query) { + return getSession().prepare(query); + } + private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec codec) { try { registry.codecFor(codec.getCqlType(), codec.getJavaType()); @@ -76,10 +84,7 @@ public abstract class CassandraAbstractDao { private ResultSet execute(Statement statement, ConsistencyLevel level) { log.debug("Execute cassandra statement {}", statement); - if (statement.getConsistencyLevel() == null) { - statement.setConsistencyLevel(level); - } - return getSession().execute(statement); + return executeAsync(statement, level).getUninterruptibly(); } private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { @@ -87,6 +92,6 @@ public abstract class CassandraAbstractDao { if (statement.getConsistencyLevel() == null) { statement.setConsistencyLevel(level); } - return getSession().executeAsync(statement); + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement); } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java index bad7b9e9b9..000316b4ff 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java @@ -60,7 +60,7 @@ public abstract class CassandraAbstractModelDao, D> exte List list = Collections.emptyList(); if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { list = result.all(); @@ -72,7 +72,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture> findListByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function>() { @Nullable @Override @@ -94,7 +94,7 @@ public abstract class CassandraAbstractModelDao, D> exte E object = null; if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { object = result.one(); @@ -106,7 +106,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture findOneByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function() { @Nullable @Override @@ -181,7 +181,7 @@ public abstract class CassandraAbstractModelDao, D> exte public boolean removeById(UUID key) { Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); log.debug("Remove request: {}", delete.toString()); - return getSession().execute(delete).wasApplied(); + return executeWrite(delete).wasApplied(); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java new file mode 100644 index 0000000000..2674c6ddea --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import javax.annotation.Nullable; +import java.util.concurrent.*; + +public class RateLimitedResultSetFuture implements ResultSetFuture { + + private final ListenableFuture originalFuture; + private final ListenableFuture rateLimitFuture; + + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { + this.rateLimitFuture = rateLimiter.acquireAsync(); + this.originalFuture = Futures.transform(rateLimitFuture, + (Function) i -> executeAsyncWithRelease(rateLimiter, session, statement)); + } + + @Override + public ResultSet getUninterruptibly() { + return safeGet().getUninterruptibly(); + } + + @Override + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = null; + try { + resultSetFuture = originalFuture.get(timeout, unit); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (originalFuture.isDone()) { + return safeGet().cancel(mayInterruptIfRunning); + } else { + return originalFuture.cancel(mayInterruptIfRunning); + } + } + + @Override + public boolean isCancelled() { + if (originalFuture.isDone()) { + return safeGet().isCancelled(); + } + + return originalFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return originalFuture.isDone() && safeGet().isDone(); + } + + @Override + public ResultSet get() throws InterruptedException, ExecutionException { + return safeGet().get(); + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit); + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + originalFuture.addListener(() -> { + try { + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); + resultSetFuture.addListener(listener, executor); + } catch (CancellationException e) { + cancel(false); + return; + } catch (ExecutionException e) { + Futures.immediateFailedFuture(e).addListener(listener, executor); + } + }, executor); + } + + private ResultSetFuture safeGet() { + try { + return originalFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) { + try { + ResultSetFuture resultSetFuture = session.executeAsync(statement); + Futures.addCallback(resultSetFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultSet result) { + rateLimiter.release(); + } + + @Override + public void onFailure(Throwable t) { + rateLimiter.release(); + } + }); + return resultSetFuture; + } catch (RuntimeException re) { + rateLimiter.release(); + throw re; + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java index 9e252412b2..55838d6462 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + "(" + ModelConstants.RELATION_FROM_ID_PROPERTY + "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY + "," + ModelConstants.RELATION_TO_ID_PROPERTY + @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteStmt() { if (deleteStmt == null) { - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" + AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" + @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteAllByEntityStmt() { if (deleteAllByEntityStmt == null) { - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?"); } @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromStmt() { if (findAllByFromStmt == null) { - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromAndTypeStmt() { if (findAllByFromAndTypeStmt == null) { - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToStmt() { if (findAllByToStmt == null) { - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToAndTypeStmt() { if (findAllByToAndTypeStmt == null) { - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getCheckRelationStmt() { if (checkRelationStmt == null) { - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " + + checkRelationStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index d620e11f1d..aba3eefbc9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement partitionInsertStmt; private PreparedStatement partitionInsertTtlStmt; - private PreparedStatement[] latestInsertStmts; + private PreparedStatement latestInsertStmt; private PreparedStatement[] saveStmts; private PreparedStatement[] saveTtlStmts; private PreparedStatement[] fetchStmts; @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { - DataType type = tsKvEntry.getDataType(); - BoundStatement stmt = getLatestStmt(type).bind() + BoundStatement stmt = getLatestStmt().bind() .setString(0, entityId.getEntityType().name()) .setUUID(1, entityId.getId()) .setString(2, tsKvEntry.getKey()) - .setLong(3, tsKvEntry.getTs()); - addValue(tsKvEntry, stmt, 4); + .setLong(3, tsKvEntry.getTs()) + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class) + .set(5, tsKvEntry.getStrValue().orElse(null), String.class) + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class) + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class); return getFuture(executeAsyncWrite(stmt), rs -> null); } @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveStmts == null) { saveStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveTtlStmts == null) { saveTtlStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; } else { - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX + + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM @@ -435,26 +437,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return fetchStmts[aggType.ordinal()]; } - private PreparedStatement getLatestStmt(DataType dataType) { - if (latestInsertStmts == null) { - latestInsertStmts = new PreparedStatement[DataType.values().length]; - for (DataType type : DataType.values()) { - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + getColumnName(type) + ")" + - " VALUES(?, ?, ?, ?, ?)"); - } + private PreparedStatement getLatestStmt() { + if (latestInsertStmt == null) { +// latestInsertStmt = new PreparedStatement[DataType.values().length]; +// for (DataType type : DataType.values()) { + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?)"); +// } } - return latestInsertStmts[dataType.ordinal()]; + return latestInsertStmt; } private PreparedStatement getPartitionInsertStmt() { if (partitionInsertStmt == null) { - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -466,7 +471,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getPartitionInsertTtlStmt() { if (partitionInsertTtlStmt == null) { - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -479,7 +484,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindLatestStmt() { if (findLatestStmt == null) { - findLatestStmt = getSession().prepare(SELECT_PREFIX + + findLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + @@ -496,7 +501,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindAllLatestStmt() { if (findAllLatestStmt == null) { - findAllLatestStmt = getSession().prepare(SELECT_PREFIX + + findAllLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java new file mode 100644 index 0000000000..6fb21d6adb --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.ListenableFuture; + +public interface AsyncRateLimiter { + + ListenableFuture acquireAsync(); + + void release(); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java new file mode 100644 index 0000000000..de07dbfa47 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -0,0 +1,160 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@Slf4j +public class BufferedRateLimiter implements AsyncRateLimiter { + + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + + private final int permitsLimit; + private final int maxPermitWaitTime; + private final AtomicInteger permits; + private final BlockingQueue queue; + + private final AtomicInteger maxQueueSize = new AtomicInteger(); + private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); + + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, + @Value("${cassandra.query.concurrent_limit}") int permitsLimit, + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) { + this.permitsLimit = permitsLimit; + this.maxPermitWaitTime = maxPermitWaitTime; + this.permits = new AtomicInteger(); + this.queue = new LinkedBlockingQueue<>(queueLimit); + } + + @Override + public ListenableFuture acquireAsync() { + if (queue.isEmpty()) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + return Futures.immediateFuture(null); + } + permits.decrementAndGet(); + } + + return putInQueue(); + } + + @Override + public void release() { + permits.decrementAndGet(); + reprocessQueue(); + } + + private void reprocessQueue() { + while (permits.get() < permitsLimit) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + LockedFuture lockedFuture = queue.poll(); + if (lockedFuture != null) { + lockedFuture.latch.countDown(); + } else { + permits.decrementAndGet(); + break; + } + } else { + permits.decrementAndGet(); + } + } + } + + private LockedFuture createLockedFuture() { + CountDownLatch latch = new CountDownLatch(1); + ListenableFuture future = pool.submit(() -> { + latch.await(); + return null; + }); + return new LockedFuture(latch, future, System.currentTimeMillis()); + } + + private ListenableFuture putInQueue() { + + int size = queue.size(); + if (size > maxQueueSize.get()) { + maxQueueSize.set(size); + } + + if (queue.remainingCapacity() > 0) { + try { + LockedFuture lockedFuture = createLockedFuture(); + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { + lockedFuture.cancelFuture(); + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + return lockedFuture.future; + } catch (InterruptedException e) { + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); + } + } + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") + public void printStats() { + int expiredCount = 0; + for (LockedFuture lockedFuture : queue) { + if (lockedFuture.isExpired()) { + lockedFuture.cancelFuture(); + expiredCount++; + } + } + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0), + maxGrantedPermissions.getAndSet(0), expiredCount); + } + + private class LockedFuture { + final CountDownLatch latch; + final ListenableFuture future; + final long createTime; + + public LockedFuture(CountDownLatch latch, ListenableFuture future, long createTime) { + this.latch = latch; + this.future = future; + this.createTime = createTime; + } + + void cancelFuture() { + future.cancel(false); + latch.countDown(); + } + + boolean isExpired() { + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime; + } + + } + + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java new file mode 100644 index 0000000000..fa62c2b9b0 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java @@ -0,0 +1,156 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RateLimitedResultSetFutureTest { + + private RateLimitedResultSetFuture resultSetFuture; + + @Mock + private AsyncRateLimiter rateLimiter; + @Mock + private Session session; + @Mock + private Statement statement; + @Mock + private ResultSetFuture realFuture; + @Mock + private ResultSet rows; + @Mock + private Row row; + + @Test + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + Thread.sleep(1000L); + verify(rateLimiter).acquireAsync(); + try { + assertTrue(resultSetFuture.isDone()); + fail(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof ExecutionException); + } + verifyNoMoreInteractions(session, rateLimiter, statement); + + } + + @Test + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.getUninterruptibly()).thenReturn(rows); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ResultSet actual = resultSetFuture.getUninterruptibly(); + assertSame(rows, actual); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenReturn(rows); + when(rows.one()).thenReturn(row); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + Row actualRow = transform.get(); + + assertSame(row, actualRow); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg")); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout"))); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + +} \ No newline at end of file diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java new file mode 100644 index 0000000000..5bfc3b6e95 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.*; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + + +public class BufferedRateLimiterTest { + + @Test + public void finishedFutureReturnedIfPermitsAreGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100); + ListenableFuture actual = limiter.acquireAsync(); + assertTrue(actual.isDone()); + } + + @Test + public void notFinishedFutureReturnedIfPermitsAreNotGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + } + + @Test + public void failedFutureReturnedIfQueueIsfull() { + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + assertTrue(actual3.isDone()); + try { + actual3.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof IllegalStateException); + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); + } + } + + @Test + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + ListenableFuture actual4 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertTrue(actual2.isDone()); + assertFalse(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual4.isDone()); + } + + @Test + public void permitsReleasedInConcurrentMode() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + AtomicInteger actualReleased = new AtomicInteger(); + AtomicInteger actualRejected = new AtomicInteger(); + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); + for (int i = 0; i < 100; i++) { + ListenableFuture> submit = pool.submit(limiter::acquireAsync); + Futures.addCallback(submit, new FutureCallback>() { + @Override + public void onSuccess(@Nullable ListenableFuture result) { + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + limiter.release(); + actualReleased.incrementAndGet(); + } + + @Override + public void onFailure(Throwable t) { + actualRejected.incrementAndGet(); + } + }); + } + + @Override + public void onFailure(Throwable t) { + } + }); + } + + TimeUnit.SECONDS.sleep(2); + assertTrue("Unexpected released count " + actualReleased.get(), + actualReleased.get() > 10 && actualReleased.get() < 20); + assertTrue("Unexpected rejected count " + actualRejected.get(), + actualRejected.get() > 80 && actualRejected.get() < 90); + + } + + +} \ No newline at end of file