Merge with master
This commit is contained in:
		
						commit
						95f1b8f835
					
				@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
 | 
			
		||||
import org.springframework.boot.SpringBootConfiguration;
 | 
			
		||||
import org.springframework.context.annotation.ComponentScan;
 | 
			
		||||
import org.springframework.scheduling.annotation.EnableAsync;
 | 
			
		||||
import org.springframework.scheduling.annotation.EnableScheduling;
 | 
			
		||||
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 | 
			
		||||
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
@ -26,6 +27,7 @@ import java.util.Arrays;
 | 
			
		||||
@SpringBootConfiguration
 | 
			
		||||
@EnableAsync
 | 
			
		||||
@EnableSwagger2
 | 
			
		||||
@EnableScheduling
 | 
			
		||||
@ComponentScan({"org.thingsboard.server"})
 | 
			
		||||
public class ThingsboardServerApplication {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.PluginMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.MsgType;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.Plugin;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
 | 
			
		||||
@ -98,7 +103,19 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
 | 
			
		||||
 | 
			
		||||
    public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
 | 
			
		||||
        if (state == ComponentLifecycleState.ACTIVE) {
 | 
			
		||||
            pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
 | 
			
		||||
            try {
 | 
			
		||||
                pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
 | 
			
		||||
            } catch (Exception ex) {
 | 
			
		||||
                RuleToPluginMsg ruleMsg = msg.getMsg();
 | 
			
		||||
                MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
 | 
			
		||||
                Integer requestId = 0;
 | 
			
		||||
                if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
 | 
			
		||||
                    requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
 | 
			
		||||
                }
 | 
			
		||||
                trustedCtx.reply(
 | 
			
		||||
                        new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
 | 
			
		||||
                                BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            //TODO: reply with plugin suspended message
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -133,7 +133,7 @@ quota:
 | 
			
		||||
    intervalMin: 2
 | 
			
		||||
 | 
			
		||||
database:
 | 
			
		||||
  type: "${DATABASE_TYPE:sql}" # cassandra OR sql
 | 
			
		||||
  type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
 | 
			
		||||
 | 
			
		||||
# Cassandra driver configuration parameters
 | 
			
		||||
cassandra:
 | 
			
		||||
@ -181,6 +181,10 @@ cassandra:
 | 
			
		||||
    default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
 | 
			
		||||
    # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
 | 
			
		||||
    ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
 | 
			
		||||
    buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
 | 
			
		||||
    concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
 | 
			
		||||
    permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
 | 
			
		||||
    rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
 | 
			
		||||
 | 
			
		||||
  queue:
 | 
			
		||||
    msg.ttl: 604800 # 7 days
 | 
			
		||||
@ -234,7 +238,7 @@ caffeine:
 | 
			
		||||
  specs:
 | 
			
		||||
    relations:
 | 
			
		||||
      timeToLiveInMinutes: 1440
 | 
			
		||||
      maxSize: 100000
 | 
			
		||||
      maxSize: 0
 | 
			
		||||
    deviceCredentials:
 | 
			
		||||
      timeToLiveInMinutes: 1440
 | 
			
		||||
      maxSize: 100000
 | 
			
		||||
 | 
			
		||||
@ -16,16 +16,16 @@
 | 
			
		||||
package org.thingsboard.server.common.msg.core;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.MsgType;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class ToServerRpcRequestMsg implements FromDeviceMsg {
 | 
			
		||||
public class ToServerRpcRequestMsg implements FromDeviceRequestMsg {
 | 
			
		||||
 | 
			
		||||
    private final int requestId;
 | 
			
		||||
    private final Integer requestId;
 | 
			
		||||
    private final String method;
 | 
			
		||||
    private final String params;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
 | 
			
		||||
        query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
 | 
			
		||||
        query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
 | 
			
		||||
        query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
        ResultSetFuture resultSetFuture = getSession().executeAsync(query);
 | 
			
		||||
        ResultSetFuture resultSetFuture = executeAsyncRead(query);
 | 
			
		||||
        return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
 | 
			
		||||
            @Nullable
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
                .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
 | 
			
		||||
                .and(eq(ATTRIBUTE_KEY_COLUMN, key));
 | 
			
		||||
        log.debug("Remove request: {}", delete.toString());
 | 
			
		||||
        return getFuture(getSession().executeAsync(delete), rs -> null);
 | 
			
		||||
        return getFuture(executeAsyncWrite(delete), rs -> null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getSaveStmt() {
 | 
			
		||||
        if (saveStmt == null) {
 | 
			
		||||
            saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
 | 
			
		||||
            saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
 | 
			
		||||
                    "(" + ENTITY_TYPE_COLUMN +
 | 
			
		||||
                    "," + ENTITY_ID_COLUMN +
 | 
			
		||||
                    "," + ATTRIBUTE_TYPE_COLUMN +
 | 
			
		||||
 | 
			
		||||
@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
 | 
			
		||||
            values.add("?");
 | 
			
		||||
        }
 | 
			
		||||
        String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
 | 
			
		||||
        return getSession().prepare(statementString);
 | 
			
		||||
        return prepare(statementString);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getPartitionInsertStmt() {
 | 
			
		||||
        if (partitionInsertStmt == null) {
 | 
			
		||||
            partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
 | 
			
		||||
            partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
 | 
			
		||||
                    "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
 | 
			
		||||
                    "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
 | 
			
		||||
                    " VALUES(?, ?)");
 | 
			
		||||
@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
 | 
			
		||||
                .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
 | 
			
		||||
        select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
 | 
			
		||||
        select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
 | 
			
		||||
        return getSession().execute(select);
 | 
			
		||||
        return executeRead(select);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
 | 
			
		||||
    public boolean removeById(UUID key) {
 | 
			
		||||
        Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
 | 
			
		||||
        log.debug("Remove request: {}", delete.toString());
 | 
			
		||||
        return getSession().execute(delete).wasApplied();
 | 
			
		||||
        return executeWrite(delete).wasApplied();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
 | 
			
		||||
        log.debug("Delete plugin meta-data entity by id [{}]", clazz);
 | 
			
		||||
        Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
 | 
			
		||||
        log.debug("Remove request: {}", delete.toString());
 | 
			
		||||
        ResultSet resultSet = getSession().execute(delete);
 | 
			
		||||
        ResultSet resultSet = executeWrite(delete);
 | 
			
		||||
        log.debug("Delete result: [{}]", resultSet.wasApplied());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
 | 
			
		||||
        query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
 | 
			
		||||
        query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
 | 
			
		||||
        query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
        ResultSetFuture resultSetFuture = getSession().executeAsync(query);
 | 
			
		||||
        ResultSetFuture resultSetFuture = executeAsyncRead(query);
 | 
			
		||||
        return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
 | 
			
		||||
            @Nullable
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
 | 
			
		||||
import org.thingsboard.server.dao.model.type.*;
 | 
			
		||||
import org.thingsboard.server.dao.util.BufferedRateLimiter;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
 | 
			
		||||
    private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    protected PreparedStatement prepare(String query) {
 | 
			
		||||
        return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
 | 
			
		||||
    }
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private BufferedRateLimiter rateLimiter;
 | 
			
		||||
 | 
			
		||||
    private Session session;
 | 
			
		||||
 | 
			
		||||
    private ConsistencyLevel defaultReadLevel;
 | 
			
		||||
    private ConsistencyLevel defaultWriteLevel;
 | 
			
		||||
 | 
			
		||||
    protected Session getSession() {
 | 
			
		||||
    private Session getSession() {
 | 
			
		||||
        if (session == null) {
 | 
			
		||||
            session = cluster.getSession();
 | 
			
		||||
            defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
 | 
			
		||||
@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
        return session;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected PreparedStatement prepare(String query) {
 | 
			
		||||
        return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
 | 
			
		||||
        try {
 | 
			
		||||
            registry.codecFor(codec.getCqlType(), codec.getJavaType());
 | 
			
		||||
@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
 | 
			
		||||
    private ResultSet execute(Statement statement, ConsistencyLevel level) {
 | 
			
		||||
        log.debug("Execute cassandra statement {}", statement);
 | 
			
		||||
        if (statement.getConsistencyLevel() == null) {
 | 
			
		||||
            statement.setConsistencyLevel(level);
 | 
			
		||||
        }
 | 
			
		||||
        return getSession().execute(statement);
 | 
			
		||||
        return executeAsync(statement, level).getUninterruptibly();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
 | 
			
		||||
@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
        if (statement.getConsistencyLevel() == null) {
 | 
			
		||||
            statement.setConsistencyLevel(level);
 | 
			
		||||
        }
 | 
			
		||||
        return getSession().executeAsync(statement);
 | 
			
		||||
        return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
 | 
			
		||||
        List<E> list = Collections.emptyList();
 | 
			
		||||
        if (statement != null) {
 | 
			
		||||
            statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
            ResultSet resultSet = getSession().execute(statement);
 | 
			
		||||
            ResultSet resultSet = executeRead(statement);
 | 
			
		||||
            Result<E> result = getMapper().map(resultSet);
 | 
			
		||||
            if (result != null) {
 | 
			
		||||
                list = result.all();
 | 
			
		||||
@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
 | 
			
		||||
    protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
 | 
			
		||||
        if (statement != null) {
 | 
			
		||||
            statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
            ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
 | 
			
		||||
            ResultSetFuture resultSetFuture = executeAsyncRead(statement);
 | 
			
		||||
            return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
 | 
			
		||||
                @Nullable
 | 
			
		||||
                @Override
 | 
			
		||||
@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
 | 
			
		||||
        E object = null;
 | 
			
		||||
        if (statement != null) {
 | 
			
		||||
            statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
            ResultSet resultSet = getSession().execute(statement);
 | 
			
		||||
            ResultSet resultSet = executeRead(statement);
 | 
			
		||||
            Result<E> result = getMapper().map(resultSet);
 | 
			
		||||
            if (result != null) {
 | 
			
		||||
                object = result.one();
 | 
			
		||||
@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
 | 
			
		||||
    protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
 | 
			
		||||
        if (statement != null) {
 | 
			
		||||
            statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
 | 
			
		||||
            ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
 | 
			
		||||
            ResultSetFuture resultSetFuture = executeAsyncRead(statement);
 | 
			
		||||
            return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
 | 
			
		||||
                @Nullable
 | 
			
		||||
                @Override
 | 
			
		||||
@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
 | 
			
		||||
    public boolean removeById(UUID key) {
 | 
			
		||||
        Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
 | 
			
		||||
        log.debug("Remove request: {}", delete.toString());
 | 
			
		||||
        return getSession().execute(delete).wasApplied();
 | 
			
		||||
        return executeWrite(delete).wasApplied();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,148 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.nosql;
 | 
			
		||||
 | 
			
		||||
import com.datastax.driver.core.ResultSet;
 | 
			
		||||
import com.datastax.driver.core.ResultSetFuture;
 | 
			
		||||
import com.datastax.driver.core.Session;
 | 
			
		||||
import com.datastax.driver.core.Statement;
 | 
			
		||||
import com.google.common.base.Function;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.Uninterruptibles;
 | 
			
		||||
import org.thingsboard.server.dao.util.AsyncRateLimiter;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.concurrent.*;
 | 
			
		||||
 | 
			
		||||
public class RateLimitedResultSetFuture implements ResultSetFuture {
 | 
			
		||||
 | 
			
		||||
    private final ListenableFuture<ResultSetFuture> originalFuture;
 | 
			
		||||
    private final ListenableFuture<Void> rateLimitFuture;
 | 
			
		||||
 | 
			
		||||
    public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
 | 
			
		||||
        this.rateLimitFuture = rateLimiter.acquireAsync();
 | 
			
		||||
        this.originalFuture = Futures.transform(rateLimitFuture,
 | 
			
		||||
                (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSet getUninterruptibly() {
 | 
			
		||||
        return safeGet().getUninterruptibly();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
 | 
			
		||||
        long rateLimitStart = System.nanoTime();
 | 
			
		||||
        ResultSetFuture resultSetFuture = null;
 | 
			
		||||
        try {
 | 
			
		||||
            resultSetFuture = originalFuture.get(timeout, unit);
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            throw new IllegalStateException(e);
 | 
			
		||||
        }
 | 
			
		||||
        long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
 | 
			
		||||
        long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
 | 
			
		||||
        if (innerTimeoutNano > 0) {
 | 
			
		||||
            return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
 | 
			
		||||
        }
 | 
			
		||||
        throw new TimeoutException("Timeout waiting for task.");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean cancel(boolean mayInterruptIfRunning) {
 | 
			
		||||
        if (originalFuture.isDone()) {
 | 
			
		||||
            return safeGet().cancel(mayInterruptIfRunning);
 | 
			
		||||
        } else {
 | 
			
		||||
            return originalFuture.cancel(mayInterruptIfRunning);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isCancelled() {
 | 
			
		||||
        if (originalFuture.isDone()) {
 | 
			
		||||
            return safeGet().isCancelled();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return originalFuture.isCancelled();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isDone() {
 | 
			
		||||
        return originalFuture.isDone() && safeGet().isDone();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSet get() throws InterruptedException, ExecutionException {
 | 
			
		||||
        return safeGet().get();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 | 
			
		||||
        long rateLimitStart = System.nanoTime();
 | 
			
		||||
        ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
 | 
			
		||||
        long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
 | 
			
		||||
        long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
 | 
			
		||||
        if (innerTimeoutNano > 0) {
 | 
			
		||||
            return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
 | 
			
		||||
        }
 | 
			
		||||
        throw new TimeoutException("Timeout waiting for task.");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void addListener(Runnable listener, Executor executor) {
 | 
			
		||||
        originalFuture.addListener(() -> {
 | 
			
		||||
            try {
 | 
			
		||||
                ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
 | 
			
		||||
                resultSetFuture.addListener(listener, executor);
 | 
			
		||||
            } catch (CancellationException e) {
 | 
			
		||||
                cancel(false);
 | 
			
		||||
                return;
 | 
			
		||||
            } catch (ExecutionException e) {
 | 
			
		||||
                Futures.immediateFailedFuture(e).addListener(listener, executor);
 | 
			
		||||
            }
 | 
			
		||||
        }, executor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ResultSetFuture safeGet() {
 | 
			
		||||
        try {
 | 
			
		||||
            return originalFuture.get();
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            throw new IllegalStateException(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
 | 
			
		||||
        try {
 | 
			
		||||
            ResultSetFuture resultSetFuture = session.executeAsync(statement);
 | 
			
		||||
            Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(@Nullable ResultSet result) {
 | 
			
		||||
                    rateLimiter.release();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    rateLimiter.release();
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            return resultSetFuture;
 | 
			
		||||
        } catch (RuntimeException re) {
 | 
			
		||||
            rateLimiter.release();
 | 
			
		||||
            throw re;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getSaveStmt() {
 | 
			
		||||
        if (saveStmt == null) {
 | 
			
		||||
            saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
 | 
			
		||||
            saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
 | 
			
		||||
                    "(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
 | 
			
		||||
                    "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
 | 
			
		||||
                    "," + ModelConstants.RELATION_TO_ID_PROPERTY +
 | 
			
		||||
@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getDeleteStmt() {
 | 
			
		||||
        if (deleteStmt == null) {
 | 
			
		||||
            deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
 | 
			
		||||
            deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
 | 
			
		||||
                    AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
 | 
			
		||||
                    AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
 | 
			
		||||
@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getDeleteAllByEntityStmt() {
 | 
			
		||||
        if (deleteAllByEntityStmt == null) {
 | 
			
		||||
            deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
 | 
			
		||||
            deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
 | 
			
		||||
                    AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
 | 
			
		||||
        }
 | 
			
		||||
@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindAllByFromStmt() {
 | 
			
		||||
        if (findAllByFromStmt == null) {
 | 
			
		||||
            findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
            findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
                    FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
                    AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindAllByFromAndTypeStmt() {
 | 
			
		||||
        if (findAllByFromAndTypeStmt == null) {
 | 
			
		||||
            findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
            findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
                    FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
                    AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindAllByToStmt() {
 | 
			
		||||
        if (findAllByToStmt == null) {
 | 
			
		||||
            findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
            findAllByToStmt = prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
                    FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
                    AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindAllByToAndTypeStmt() {
 | 
			
		||||
        if (findAllByToAndTypeStmt == null) {
 | 
			
		||||
            findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
            findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
                    FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
                    AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getCheckRelationStmt() {
 | 
			
		||||
        if (checkRelationStmt == null) {
 | 
			
		||||
            checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
            checkRelationStmt = prepare(SELECT_COLUMNS + " " +
 | 
			
		||||
                    FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
 | 
			
		||||
                    WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
                    AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
 | 
			
		||||
 | 
			
		||||
@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deleteRelation(EntityRelation relation) {
 | 
			
		||||
@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
 | 
			
		||||
@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
 | 
			
		||||
@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement partitionInsertStmt;
 | 
			
		||||
    private PreparedStatement partitionInsertTtlStmt;
 | 
			
		||||
    private PreparedStatement[] latestInsertStmts;
 | 
			
		||||
    private PreparedStatement latestInsertStmt;
 | 
			
		||||
    private PreparedStatement[] saveStmts;
 | 
			
		||||
    private PreparedStatement[] saveTtlStmts;
 | 
			
		||||
    private PreparedStatement[] fetchStmts;
 | 
			
		||||
@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
 | 
			
		||||
        DataType type = tsKvEntry.getDataType();
 | 
			
		||||
        BoundStatement stmt = getLatestStmt(type).bind()
 | 
			
		||||
        BoundStatement stmt = getLatestStmt().bind()
 | 
			
		||||
                .setString(0, entityId.getEntityType().name())
 | 
			
		||||
                .setUUID(1, entityId.getId())
 | 
			
		||||
                .setString(2, tsKvEntry.getKey())
 | 
			
		||||
                .setLong(3, tsKvEntry.getTs());
 | 
			
		||||
        addValue(tsKvEntry, stmt, 4);
 | 
			
		||||
                .setLong(3, tsKvEntry.getTs())
 | 
			
		||||
                .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
 | 
			
		||||
                .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
 | 
			
		||||
                .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
 | 
			
		||||
                .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
 | 
			
		||||
        return getFuture(executeAsyncWrite(stmt), rs -> null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        if (saveStmts == null) {
 | 
			
		||||
            saveStmts = new PreparedStatement[DataType.values().length];
 | 
			
		||||
            for (DataType type : DataType.values()) {
 | 
			
		||||
                saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
 | 
			
		||||
                saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
 | 
			
		||||
                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.KEY_COLUMN +
 | 
			
		||||
@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        if (saveTtlStmts == null) {
 | 
			
		||||
            saveTtlStmts = new PreparedStatement[DataType.values().length];
 | 
			
		||||
            for (DataType type : DataType.values()) {
 | 
			
		||||
                saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
 | 
			
		||||
                saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
 | 
			
		||||
                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.KEY_COLUMN +
 | 
			
		||||
@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
                } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
 | 
			
		||||
                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
 | 
			
		||||
                } else {
 | 
			
		||||
                    fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX +
 | 
			
		||||
                    fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
 | 
			
		||||
                            String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
 | 
			
		||||
                            + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
 | 
			
		||||
                            + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
 | 
			
		||||
@ -435,26 +437,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        return fetchStmts[aggType.ordinal()];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getLatestStmt(DataType dataType) {
 | 
			
		||||
        if (latestInsertStmts == null) {
 | 
			
		||||
            latestInsertStmts = new PreparedStatement[DataType.values().length];
 | 
			
		||||
            for (DataType type : DataType.values()) {
 | 
			
		||||
                latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
 | 
			
		||||
                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.KEY_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.TS_COLUMN +
 | 
			
		||||
                        "," + getColumnName(type) + ")" +
 | 
			
		||||
                        " VALUES(?, ?, ?, ?, ?)");
 | 
			
		||||
            }
 | 
			
		||||
    private PreparedStatement getLatestStmt() {
 | 
			
		||||
        if (latestInsertStmt == null) {
 | 
			
		||||
//            latestInsertStmt = new PreparedStatement[DataType.values().length];
 | 
			
		||||
//            for (DataType type : DataType.values()) {
 | 
			
		||||
            latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
 | 
			
		||||
                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.KEY_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.TS_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.STRING_VALUE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.LONG_VALUE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
 | 
			
		||||
                    " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
 | 
			
		||||
//            }
 | 
			
		||||
        }
 | 
			
		||||
        return latestInsertStmts[dataType.ordinal()];
 | 
			
		||||
        return latestInsertStmt;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getPartitionInsertStmt() {
 | 
			
		||||
        if (partitionInsertStmt == null) {
 | 
			
		||||
            partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
 | 
			
		||||
            partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
 | 
			
		||||
                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.PARTITION_COLUMN +
 | 
			
		||||
@ -466,7 +471,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getPartitionInsertTtlStmt() {
 | 
			
		||||
        if (partitionInsertTtlStmt == null) {
 | 
			
		||||
            partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
 | 
			
		||||
            partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
 | 
			
		||||
                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.PARTITION_COLUMN +
 | 
			
		||||
@ -479,7 +484,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindLatestStmt() {
 | 
			
		||||
        if (findLatestStmt == null) {
 | 
			
		||||
            findLatestStmt = getSession().prepare(SELECT_PREFIX +
 | 
			
		||||
            findLatestStmt = prepare(SELECT_PREFIX +
 | 
			
		||||
                    ModelConstants.KEY_COLUMN + "," +
 | 
			
		||||
                    ModelConstants.TS_COLUMN + "," +
 | 
			
		||||
                    ModelConstants.STRING_VALUE_COLUMN + "," +
 | 
			
		||||
@ -496,7 +501,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindAllLatestStmt() {
 | 
			
		||||
        if (findAllLatestStmt == null) {
 | 
			
		||||
            findAllLatestStmt = getSession().prepare(SELECT_PREFIX +
 | 
			
		||||
            findAllLatestStmt = prepare(SELECT_PREFIX +
 | 
			
		||||
                    ModelConstants.KEY_COLUMN + "," +
 | 
			
		||||
                    ModelConstants.TS_COLUMN + "," +
 | 
			
		||||
                    ModelConstants.STRING_VALUE_COLUMN + "," +
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,25 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.util;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
 | 
			
		||||
public interface AsyncRateLimiter {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> acquireAsync();
 | 
			
		||||
 | 
			
		||||
    void release();
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,160 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.util;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.*;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class BufferedRateLimiter implements AsyncRateLimiter {
 | 
			
		||||
 | 
			
		||||
    private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
 | 
			
		||||
 | 
			
		||||
    private final int permitsLimit;
 | 
			
		||||
    private final int maxPermitWaitTime;
 | 
			
		||||
    private final AtomicInteger permits;
 | 
			
		||||
    private final BlockingQueue<LockedFuture> queue;
 | 
			
		||||
 | 
			
		||||
    private final AtomicInteger maxQueueSize = new AtomicInteger();
 | 
			
		||||
    private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
 | 
			
		||||
 | 
			
		||||
    public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
 | 
			
		||||
                               @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
 | 
			
		||||
                               @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
 | 
			
		||||
        this.permitsLimit = permitsLimit;
 | 
			
		||||
        this.maxPermitWaitTime = maxPermitWaitTime;
 | 
			
		||||
        this.permits = new AtomicInteger();
 | 
			
		||||
        this.queue = new LinkedBlockingQueue<>(queueLimit);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> acquireAsync() {
 | 
			
		||||
        if (queue.isEmpty()) {
 | 
			
		||||
            if (permits.incrementAndGet() <= permitsLimit) {
 | 
			
		||||
                if (permits.get() > maxGrantedPermissions.get()) {
 | 
			
		||||
                    maxGrantedPermissions.set(permits.get());
 | 
			
		||||
                }
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }
 | 
			
		||||
            permits.decrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return putInQueue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void release() {
 | 
			
		||||
        permits.decrementAndGet();
 | 
			
		||||
        reprocessQueue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void reprocessQueue() {
 | 
			
		||||
        while (permits.get() < permitsLimit) {
 | 
			
		||||
            if (permits.incrementAndGet() <= permitsLimit) {
 | 
			
		||||
                if (permits.get() > maxGrantedPermissions.get()) {
 | 
			
		||||
                    maxGrantedPermissions.set(permits.get());
 | 
			
		||||
                }
 | 
			
		||||
                LockedFuture lockedFuture = queue.poll();
 | 
			
		||||
                if (lockedFuture != null) {
 | 
			
		||||
                    lockedFuture.latch.countDown();
 | 
			
		||||
                } else {
 | 
			
		||||
                    permits.decrementAndGet();
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                permits.decrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private LockedFuture createLockedFuture() {
 | 
			
		||||
        CountDownLatch latch = new CountDownLatch(1);
 | 
			
		||||
        ListenableFuture<Void> future = pool.submit(() -> {
 | 
			
		||||
            latch.await();
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
        return new LockedFuture(latch, future, System.currentTimeMillis());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> putInQueue() {
 | 
			
		||||
 | 
			
		||||
        int size = queue.size();
 | 
			
		||||
        if (size > maxQueueSize.get()) {
 | 
			
		||||
            maxQueueSize.set(size);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (queue.remainingCapacity() > 0) {
 | 
			
		||||
            try {
 | 
			
		||||
                LockedFuture lockedFuture = createLockedFuture();
 | 
			
		||||
                if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
 | 
			
		||||
                    lockedFuture.cancelFuture();
 | 
			
		||||
                    return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
 | 
			
		||||
                }
 | 
			
		||||
                return lockedFuture.future;
 | 
			
		||||
            } catch (InterruptedException e) {
 | 
			
		||||
                return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        int expiredCount = 0;
 | 
			
		||||
        for (LockedFuture lockedFuture : queue) {
 | 
			
		||||
            if (lockedFuture.isExpired()) {
 | 
			
		||||
                lockedFuture.cancelFuture();
 | 
			
		||||
                expiredCount++;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),
 | 
			
		||||
                maxGrantedPermissions.getAndSet(0), expiredCount);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private class LockedFuture {
 | 
			
		||||
        final CountDownLatch latch;
 | 
			
		||||
        final ListenableFuture<Void> future;
 | 
			
		||||
        final long createTime;
 | 
			
		||||
 | 
			
		||||
        public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
 | 
			
		||||
            this.latch = latch;
 | 
			
		||||
            this.future = future;
 | 
			
		||||
            this.createTime = createTime;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        void cancelFuture() {
 | 
			
		||||
            future.cancel(false);
 | 
			
		||||
            latch.countDown();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        boolean isExpired() {
 | 
			
		||||
            return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,156 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.nosql;
 | 
			
		||||
 | 
			
		||||
import com.datastax.driver.core.*;
 | 
			
		||||
import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.mockito.runners.MockitoJUnitRunner;
 | 
			
		||||
import org.mockito.stubbing.Answer;
 | 
			
		||||
import org.thingsboard.server.dao.util.AsyncRateLimiter;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.*;
 | 
			
		||||
import static org.mockito.Mockito.*;
 | 
			
		||||
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class RateLimitedResultSetFutureTest {
 | 
			
		||||
 | 
			
		||||
    private RateLimitedResultSetFuture resultSetFuture;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private AsyncRateLimiter rateLimiter;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private Session session;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private Statement statement;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private ResultSetFuture realFuture;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private ResultSet rows;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private Row row;
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
 | 
			
		||||
        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
 | 
			
		||||
        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
 | 
			
		||||
        Thread.sleep(1000L);
 | 
			
		||||
        verify(rateLimiter).acquireAsync();
 | 
			
		||||
        try {
 | 
			
		||||
            assertTrue(resultSetFuture.isDone());
 | 
			
		||||
            fail();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            assertTrue(e instanceof IllegalStateException);
 | 
			
		||||
            Throwable actualCause = e.getCause();
 | 
			
		||||
            assertTrue(actualCause instanceof ExecutionException);
 | 
			
		||||
        }
 | 
			
		||||
        verifyNoMoreInteractions(session, rateLimiter, statement);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
 | 
			
		||||
        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
        when(session.executeAsync(statement)).thenReturn(realFuture);
 | 
			
		||||
        Mockito.doAnswer((Answer<Void>) invocation -> {
 | 
			
		||||
            Object[] args = invocation.getArguments();
 | 
			
		||||
            Runnable task = (Runnable) args[0];
 | 
			
		||||
            task.run();
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
 | 
			
		||||
 | 
			
		||||
        when(realFuture.getUninterruptibly()).thenReturn(rows);
 | 
			
		||||
 | 
			
		||||
        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
 | 
			
		||||
        ResultSet actual = resultSetFuture.getUninterruptibly();
 | 
			
		||||
        assertSame(rows, actual);
 | 
			
		||||
        verify(rateLimiter, times(1)).acquireAsync();
 | 
			
		||||
        verify(rateLimiter, times(1)).release();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
 | 
			
		||||
        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
        when(session.executeAsync(statement)).thenReturn(realFuture);
 | 
			
		||||
        Mockito.doAnswer((Answer<Void>) invocation -> {
 | 
			
		||||
            Object[] args = invocation.getArguments();
 | 
			
		||||
            Runnable task = (Runnable) args[0];
 | 
			
		||||
            task.run();
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
 | 
			
		||||
 | 
			
		||||
        when(realFuture.get()).thenReturn(rows);
 | 
			
		||||
        when(rows.one()).thenReturn(row);
 | 
			
		||||
 | 
			
		||||
        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
 | 
			
		||||
        Row actualRow = transform.get();
 | 
			
		||||
 | 
			
		||||
        assertSame(row, actualRow);
 | 
			
		||||
        verify(rateLimiter, times(1)).acquireAsync();
 | 
			
		||||
        verify(rateLimiter, times(1)).release();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
 | 
			
		||||
        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
        when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
 | 
			
		||||
        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
 | 
			
		||||
        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
 | 
			
		||||
        try {
 | 
			
		||||
            transform.get();
 | 
			
		||||
            fail();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            assertTrue(e instanceof ExecutionException);
 | 
			
		||||
        }
 | 
			
		||||
        verify(rateLimiter, times(1)).acquireAsync();
 | 
			
		||||
        verify(rateLimiter, times(1)).release();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
 | 
			
		||||
        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
        when(session.executeAsync(statement)).thenReturn(realFuture);
 | 
			
		||||
        Mockito.doAnswer((Answer<Void>) invocation -> {
 | 
			
		||||
            Object[] args = invocation.getArguments();
 | 
			
		||||
            Runnable task = (Runnable) args[0];
 | 
			
		||||
            task.run();
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
 | 
			
		||||
 | 
			
		||||
        when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
 | 
			
		||||
        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
 | 
			
		||||
        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
 | 
			
		||||
        try {
 | 
			
		||||
            transform.get();
 | 
			
		||||
            fail();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            assertTrue(e instanceof ExecutionException);
 | 
			
		||||
        }
 | 
			
		||||
        verify(rateLimiter, times(1)).acquireAsync();
 | 
			
		||||
        verify(rateLimiter, times(1)).release();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,134 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.util;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.*;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.*;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
public class BufferedRateLimiterTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void finishedFutureReturnedIfPermitsAreGranted() {
 | 
			
		||||
        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
 | 
			
		||||
        ListenableFuture<Void> actual = limiter.acquireAsync();
 | 
			
		||||
        assertTrue(actual.isDone());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
 | 
			
		||||
        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
 | 
			
		||||
        ListenableFuture<Void> actual1 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual2 = limiter.acquireAsync();
 | 
			
		||||
        assertTrue(actual1.isDone());
 | 
			
		||||
        assertFalse(actual2.isDone());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void failedFutureReturnedIfQueueIsfull() {
 | 
			
		||||
        BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
 | 
			
		||||
        ListenableFuture<Void> actual1 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual2 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual3 = limiter.acquireAsync();
 | 
			
		||||
 | 
			
		||||
        assertTrue(actual1.isDone());
 | 
			
		||||
        assertFalse(actual2.isDone());
 | 
			
		||||
        assertTrue(actual3.isDone());
 | 
			
		||||
        try {
 | 
			
		||||
            actual3.get();
 | 
			
		||||
            fail();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            assertTrue(e instanceof ExecutionException);
 | 
			
		||||
            Throwable actualCause = e.getCause();
 | 
			
		||||
            assertTrue(actualCause instanceof IllegalStateException);
 | 
			
		||||
            assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
 | 
			
		||||
        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
 | 
			
		||||
        ListenableFuture<Void> actual1 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual2 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual3 = limiter.acquireAsync();
 | 
			
		||||
        ListenableFuture<Void> actual4 = limiter.acquireAsync();
 | 
			
		||||
        assertTrue(actual1.isDone());
 | 
			
		||||
        assertTrue(actual2.isDone());
 | 
			
		||||
        assertFalse(actual3.isDone());
 | 
			
		||||
        assertFalse(actual4.isDone());
 | 
			
		||||
        limiter.release();
 | 
			
		||||
        TimeUnit.MILLISECONDS.sleep(100L);
 | 
			
		||||
        assertTrue(actual3.isDone());
 | 
			
		||||
        assertFalse(actual4.isDone());
 | 
			
		||||
        limiter.release();
 | 
			
		||||
        TimeUnit.MILLISECONDS.sleep(100L);
 | 
			
		||||
        assertTrue(actual4.isDone());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void permitsReleasedInConcurrentMode() throws InterruptedException {
 | 
			
		||||
        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
 | 
			
		||||
        AtomicInteger actualReleased = new AtomicInteger();
 | 
			
		||||
        AtomicInteger actualRejected = new AtomicInteger();
 | 
			
		||||
        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
 | 
			
		||||
        for (int i = 0; i < 100; i++) {
 | 
			
		||||
            ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
 | 
			
		||||
            Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(@Nullable ListenableFuture<Void> result) {
 | 
			
		||||
                    Futures.addCallback(result, new FutureCallback<Void>() {
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onSuccess(@Nullable Void result) {
 | 
			
		||||
                            try {
 | 
			
		||||
                                TimeUnit.MILLISECONDS.sleep(100);
 | 
			
		||||
                            } catch (InterruptedException e) {
 | 
			
		||||
                                e.printStackTrace();
 | 
			
		||||
                            }
 | 
			
		||||
                            limiter.release();
 | 
			
		||||
                            actualReleased.incrementAndGet();
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        @Override
 | 
			
		||||
                        public void onFailure(Throwable t) {
 | 
			
		||||
                            actualRejected.incrementAndGet();
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TimeUnit.SECONDS.sleep(2);
 | 
			
		||||
        assertTrue("Unexpected released count " + actualReleased.get(),
 | 
			
		||||
                actualReleased.get() > 10 && actualReleased.get() < 20);
 | 
			
		||||
        assertTrue("Unexpected rejected count " + actualRejected.get(),
 | 
			
		||||
                actualRejected.get() > 80 && actualRejected.get() < 90);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
 | 
			
		||||
                        let addedFile = event.target.result;
 | 
			
		||||
 | 
			
		||||
                        if (addedFile && addedFile.length > 0) {
 | 
			
		||||
                            model[options.fileName] = $file.name;
 | 
			
		||||
                            model[options.file] = addedFile.replace(/^data.*base64,/, "");
 | 
			
		||||
                            model[options.location] = $file.name;
 | 
			
		||||
                            model[options.fileContent] = addedFile.replace(/^data.*base64,/, "");
 | 
			
		||||
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
 | 
			
		||||
        scope.clearFile = function(model, options) {
 | 
			
		||||
            scope.theForm.$setDirty();
 | 
			
		||||
 | 
			
		||||
            model[options.fileName] = null;
 | 
			
		||||
            model[options.file] = null;
 | 
			
		||||
            model[options.location] = null;
 | 
			
		||||
            model[options.fileContent] = null;
 | 
			
		||||
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -212,8 +212,8 @@
 | 
			
		||||
                                                    </md-input-container>
 | 
			
		||||
 | 
			
		||||
                                                    <section class="dropdown-section">
 | 
			
		||||
                                                        <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.file}">
 | 
			
		||||
                                                            <span ng-init='fieldsToFill = {"fileName":"fileName", "file":"file"}'></span>
 | 
			
		||||
                                                        <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.fileContent}">
 | 
			
		||||
                                                            <span ng-init='fieldsToFill = {"location":"location", "fileContent":"fileContent"}'></span>
 | 
			
		||||
                                                            <label class="tb-label" translate>extension.opc-keystore-location</label>
 | 
			
		||||
                                                            <div flow-init="{singleFile:true}" flow-file-added='fileAdded($file, server.keystore, fieldsToFill)' class="tb-file-select-container">
 | 
			
		||||
                                                                <div class="tb-file-clear-container">
 | 
			
		||||
@ -231,14 +231,14 @@
 | 
			
		||||
                                                                           class="file-input"
 | 
			
		||||
                                                                           flow-btn id="dropFileKeystore_{{serverIndex}}"
 | 
			
		||||
                                                                           name="keystoreFile"
 | 
			
		||||
                                                                           ng-model="server.keystore.file"
 | 
			
		||||
                                                                           ng-model="server.keystore.fileContent"
 | 
			
		||||
                                                                    >
 | 
			
		||||
                                                                </div>
 | 
			
		||||
                                                            </div>
 | 
			
		||||
                                                        </div>
 | 
			
		||||
                                                        <div class="dropdown-messages">
 | 
			
		||||
                                                            <div ng-if="!server.keystore[fieldsToFill.fileName]" class="tb-error-message" translate>extension.no-file</div>
 | 
			
		||||
                                                            <div ng-if="server.keystore[fieldsToFill.fileName]">{{server.keystore[fieldsToFill.fileName]}}</div>
 | 
			
		||||
                                                            <div ng-if="!server.keystore[fieldsToFill.location]" class="tb-error-message" translate>extension.no-file</div>
 | 
			
		||||
                                                            <div ng-if="server.keystore[fieldsToFill.location]">{{server.keystore[fieldsToFill.location]}}</div>
 | 
			
		||||
                                                        </div>
 | 
			
		||||
                                                    </section>
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user