Query refactoring and improvements
This commit is contained in:
parent
9764f27a10
commit
070e5b5b62
@ -16,11 +16,11 @@
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
ts bigint,
|
||||
msg blob,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), ts))
|
||||
PRIMARY KEY ((node_id, cluster_partition, ts_partition), ts))
|
||||
WITH CLUSTERING ORDER BY (ts DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
@ -33,10 +33,10 @@ AND compaction = {
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
msg_id timeuuid,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
|
||||
PRIMARY KEY ((node_id, cluster_partition, ts_partition), msg_id))
|
||||
WITH CLUSTERING ORDER BY (msg_id DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
@ -49,10 +49,10 @@ AND compaction = {
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
PRIMARY KEY ((node_id, clustered_hash), partition))
|
||||
WITH CLUSTERING ORDER BY (partition DESC)
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
PRIMARY KEY ((node_id, cluster_partition), ts_partition))
|
||||
WITH CLUSTERING ORDER BY (ts_partition DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
'min_threshold': '5',
|
||||
|
||||
@ -182,6 +182,12 @@ cassandra:
|
||||
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
|
||||
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
|
||||
|
||||
queue:
|
||||
msg.ttl: 604800 # 7 days
|
||||
ack.ttl: 604800 # 7 days
|
||||
partitions.ttl: 604800 # 7 days
|
||||
partitioning: "HOURS"
|
||||
|
||||
# SQL configuration parameters
|
||||
sql:
|
||||
# Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
|
||||
|
||||
@ -56,6 +56,11 @@
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
@ -70,6 +75,10 @@
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.xolstice.maven.plugins</groupId>
|
||||
<artifactId>protobuf-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 13.01.18.
|
||||
*/
|
||||
@Data
|
||||
public final class TbMsg implements Serializable {
|
||||
|
||||
private final UUID id;
|
||||
private final String type;
|
||||
private final EntityId originator;
|
||||
private final TbMsgMetaData metaData;
|
||||
|
||||
private final byte[] data;
|
||||
|
||||
public static ByteBuffer toBytes(TbMsg msg) {
|
||||
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
|
||||
builder.setId(msg.getId().toString());
|
||||
builder.setType(msg.getType());
|
||||
if (msg.getOriginator() != null) {
|
||||
builder.setEntityType(msg.getOriginator().getEntityType().name());
|
||||
builder.setEntityId(msg.getOriginator().getId().toString());
|
||||
}
|
||||
|
||||
if (msg.getMetaData() != null) {
|
||||
MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
|
||||
metadataBuilder.putAllData(msg.getMetaData().getData());
|
||||
builder.addMetaData(metadataBuilder.build());
|
||||
}
|
||||
|
||||
builder.setData(ByteString.copyFrom(msg.getData()));
|
||||
byte[] bytes = builder.build().toByteArray();
|
||||
return ByteBuffer.wrap(bytes);
|
||||
}
|
||||
|
||||
public static TbMsg fromBytes(ByteBuffer buffer) {
|
||||
try {
|
||||
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
if (proto.getMetaDataCount() > 0) {
|
||||
metaData.setData(proto.getMetaData(0).getDataMap());
|
||||
}
|
||||
|
||||
EntityId entityId = null;
|
||||
if (proto.getEntityId() != null) {
|
||||
entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
|
||||
}
|
||||
|
||||
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
package org.thingsboard.server.common.msg;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -16,8 +16,8 @@
|
||||
syntax = "proto3";
|
||||
package msgqueue;
|
||||
|
||||
option java_package = "org.thingsboard.rule.engine.queue.cassandra.repository.gen";
|
||||
option java_outer_classname = "MsgQueueProtos";
|
||||
option java_package = "org.thingsboard.server.common.msg.gen";
|
||||
option java_outer_classname = "MsgProtos";
|
||||
|
||||
|
||||
message TbMsgProto {
|
||||
15
dao/pom.xml
15
dao/pom.xml
@ -38,7 +38,11 @@
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>data</artifactId>
|
||||
</dependency>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>message</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
@ -140,6 +144,10 @@
|
||||
</exclusions>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-x-discovery</artifactId>
|
||||
@ -198,6 +206,11 @@
|
||||
<groupId>org.elasticsearch.client</groupId>
|
||||
<artifactId>rest</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>RELEASE</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
@ -22,12 +22,21 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
||||
import org.thingsboard.server.dao.model.type.*;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@Slf4j
|
||||
public abstract class CassandraAbstractDao {
|
||||
|
||||
@Autowired
|
||||
protected CassandraCluster cluster;
|
||||
|
||||
private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
|
||||
|
||||
protected PreparedStatement prepare(String query) {
|
||||
return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
|
||||
}
|
||||
|
||||
private Session session;
|
||||
|
||||
private ConsistencyLevel defaultReadLevel;
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.nosql;
|
||||
|
||||
import com.datastax.driver.core.PreparedStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.datastax.driver.core.Statement;
|
||||
@ -37,6 +38,8 @@ import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
|
||||
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,17 +13,18 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
package org.thingsboard.server.dao.queue;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public interface MsqQueue {
|
||||
|
||||
ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash);
|
||||
ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition);
|
||||
|
||||
ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash);
|
||||
ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition);
|
||||
|
||||
Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash);
|
||||
Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition);
|
||||
}
|
||||
@ -1,4 +1,19 @@
|
||||
package org.thingsboard.rule.engine.tool;
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.queue;
|
||||
|
||||
import com.datastax.driver.core.Cluster;
|
||||
import com.datastax.driver.core.HostDistance;
|
||||
@ -16,9 +31,8 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.thingsboard.rule.engine.api.MsqQueue;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.rule.engine.api.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -29,9 +43,9 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableAutoConfiguration
|
||||
@ComponentScan({"org.thingsboard.rule.engine"})
|
||||
//@SpringBootApplication
|
||||
//@EnableAutoConfiguration
|
||||
//@ComponentScan({"org.thingsboard.rule.engine"})
|
||||
//@PropertySource("classpath:processing-pipeline.properties")
|
||||
@Slf4j
|
||||
public class QueueBenchmark implements CommandLineRunner {
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.jpa;
|
||||
package org.thingsboard.server.dao.queue.jpa;
|
||||
|
||||
//@todo-vp: implement
|
||||
public class SqlMsgQueue {
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,60 +13,57 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.api.MsqQueue;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
|
||||
import org.thingsboard.server.common.data.UUIDConverter;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.dao.queue.MsqQueue;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.AckRepository;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.MsgRepository;
|
||||
import org.thingsboard.server.dao.util.NoSqlDao;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@NoSqlDao
|
||||
public class CassandraMsqQueue implements MsqQueue {
|
||||
|
||||
private final MsgRepository msgRepository;
|
||||
private final AckRepository ackRepository;
|
||||
private final UnprocessedMsgFilter unprocessedMsgFilter;
|
||||
private final QueuePartitioner queuePartitioner;
|
||||
|
||||
public CassandraMsqQueue(MsgRepository msgRepository, AckRepository ackRepository,
|
||||
UnprocessedMsgFilter unprocessedMsgFilter, QueuePartitioner queuePartitioner) {
|
||||
this.msgRepository = msgRepository;
|
||||
this.ackRepository = ackRepository;
|
||||
this.unprocessedMsgFilter = unprocessedMsgFilter;
|
||||
this.queuePartitioner = queuePartitioner;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private MsgRepository msgRepository;
|
||||
@Autowired
|
||||
private AckRepository ackRepository;
|
||||
@Autowired
|
||||
private UnprocessedMsgFilter unprocessedMsgFilter;
|
||||
@Autowired
|
||||
private QueuePartitioner queuePartitioner;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash) {
|
||||
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
|
||||
long msgTime = getMsgTime(msg);
|
||||
long partition = queuePartitioner.getPartition(msgTime);
|
||||
return msgRepository.save(msg, nodeId, clusteredHash, partition, msgTime);
|
||||
long tsPartition = queuePartitioner.getPartition(msgTime);
|
||||
return msgRepository.save(msg, nodeId, clusterPartition, tsPartition, msgTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash) {
|
||||
long partition = queuePartitioner.getPartition(getMsgTime(msg));
|
||||
MsgAck ack = new MsgAck(msg.getId(), nodeId, clusteredHash, partition);
|
||||
public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
|
||||
long tsPartition = queuePartitioner.getPartition(getMsgTime(msg));
|
||||
MsgAck ack = new MsgAck(msg.getId(), nodeId, clusterPartition, tsPartition);
|
||||
return ackRepository.ack(ack);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash) {
|
||||
public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
|
||||
List<TbMsg> unprocessedMsgs = Lists.newArrayList();
|
||||
for (Long partition : queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash)) {
|
||||
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
|
||||
List<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
|
||||
for (Long tsPartition : queuePartitioner.findUnprocessedPartitions(nodeId, clusterPartition)) {
|
||||
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusterPartition, tsPartition);
|
||||
List<MsgAck> acks = ackRepository.findAcks(nodeId, clusterPartition, tsPartition);
|
||||
unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
|
||||
}
|
||||
return unprocessedMsgs;
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,13 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.server.common.data.UUIDConverter;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@ -29,7 +26,7 @@ public class MsgAck {
|
||||
|
||||
private final UUID msgId;
|
||||
private final UUID nodeId;
|
||||
private final long clusteredHash;
|
||||
private final long partition;
|
||||
private final long clusteredPartition;
|
||||
private final long tsPartition;
|
||||
|
||||
}
|
||||
@ -1,26 +1,27 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
import org.thingsboard.server.dao.timeseries.TsPartitionDate;
|
||||
import org.thingsboard.server.dao.util.NoSqlDao;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
@ -32,26 +33,27 @@ import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@NoSqlDao
|
||||
public class QueuePartitioner {
|
||||
|
||||
private ProcessedPartitionRepository processedPartitionRepository;
|
||||
|
||||
private final TsPartitionDate tsFormat;
|
||||
private ProcessedPartitionRepository processedPartitionRepository;
|
||||
private Clock clock = Clock.systemUTC();
|
||||
|
||||
public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning,
|
||||
public QueuePartitioner(@Value("${cassandra.queue.partitioning}") String partitioning,
|
||||
ProcessedPartitionRepository processedPartitionRepository) {
|
||||
this.processedPartitionRepository = processedPartitionRepository;
|
||||
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
|
||||
if (partition.isPresent()) {
|
||||
tsFormat = partition.get();
|
||||
} else {
|
||||
log.warn("Incorrect configuration of partitioning {}", "MINUTES");
|
||||
throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!");
|
||||
log.warn("Incorrect configuration of partitioning {}", partitioning);
|
||||
throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
|
||||
}
|
||||
}
|
||||
|
||||
public long getPartition(long ts) {
|
||||
//TODO: use TsPartitionDate.truncateTo?
|
||||
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
|
||||
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,10 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,10 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -25,5 +25,5 @@ public interface AckRepository {
|
||||
|
||||
ListenableFuture<Void> ack(MsgAck msgAck);
|
||||
|
||||
List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
|
||||
List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition);
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,18 +13,18 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface MsgRepository {
|
||||
|
||||
ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs);
|
||||
ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs);
|
||||
|
||||
List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
|
||||
List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition);
|
||||
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
@ -1,62 +1,62 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
|
||||
import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.AckRepository;
|
||||
import org.thingsboard.server.dao.util.NoSqlDao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository {
|
||||
@NoSqlDao
|
||||
public class CassandraAckRepository extends CassandraAbstractDao implements AckRepository {
|
||||
|
||||
private final int ackQueueTtl;
|
||||
|
||||
public CassandraAckRepository(Session session, int ackQueueTtl) {
|
||||
super(session);
|
||||
this.ackQueueTtl = ackQueueTtl;
|
||||
}
|
||||
@Value("${cassandra.queue.ack.ttl}")
|
||||
private int ackQueueTtl;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> ack(MsgAck msgAck) {
|
||||
String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
|
||||
String insert = "INSERT INTO msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
|
||||
PreparedStatement statement = prepare(insert);
|
||||
BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(),
|
||||
msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl);
|
||||
BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredPartition(),
|
||||
msgAck.getTsPartition(), msgAck.getMsgId(), ackQueueTtl);
|
||||
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
|
||||
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition) {
|
||||
public List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition) {
|
||||
String select = "SELECT msg_id FROM msg_ack_queue WHERE " +
|
||||
"node_id = ? AND clustered_hash = ? AND partition = ?";
|
||||
"node_id = ? AND cluster_partition = ? AND ts_partition = ?";
|
||||
PreparedStatement statement = prepare(select);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);
|
||||
ResultSet rows = executeRead(boundStatement);
|
||||
List<MsgAck> msgs = new ArrayList<>();
|
||||
for (Row row : rows) {
|
||||
msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition));
|
||||
msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusterPartition, tsPartition));
|
||||
}
|
||||
return msgs;
|
||||
}
|
||||
@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.MsgRepository;
|
||||
import org.thingsboard.server.dao.util.NoSqlDao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@NoSqlDao
|
||||
public class CassandraMsgRepository extends CassandraAbstractDao implements MsgRepository {
|
||||
|
||||
@Value("${cassandra.queue.msg.ttl}")
|
||||
private int msqQueueTtl;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs) {
|
||||
String insert = "INSERT INTO msg_queue (node_id, cluster_partition, ts_partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
|
||||
PreparedStatement statement = prepare(insert);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition, msgTs, TbMsg.toBytes(msg), msqQueueTtl);
|
||||
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
|
||||
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition) {
|
||||
String select = "SELECT node_id, cluster_partition, ts_partition, ts, msg FROM msg_queue WHERE " +
|
||||
"node_id = ? AND cluster_partition = ? AND ts_partition = ?";
|
||||
PreparedStatement statement = prepare(select);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);
|
||||
ResultSet rows = executeRead(boundStatement);
|
||||
List<TbMsg> msgs = new ArrayList<>();
|
||||
for (Row row : rows) {
|
||||
msgs.add(TbMsg.fromBytes(row.getBytes("msg")));
|
||||
}
|
||||
return msgs;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,53 +1,53 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
import org.thingsboard.server.dao.util.NoSqlDao;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository {
|
||||
@NoSqlDao
|
||||
public class CassandraProcessedPartitionRepository extends CassandraAbstractDao implements ProcessedPartitionRepository {
|
||||
|
||||
private final int repositoryTtl;
|
||||
|
||||
public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) {
|
||||
super(session);
|
||||
this.repositoryTtl = repositoryTtl;
|
||||
}
|
||||
@Value("${cassandra.queue.partitions.ttl}")
|
||||
private int partitionsTtl;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition) {
|
||||
String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?";
|
||||
public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusterPartition, long tsPartition) {
|
||||
String insert = "INSERT INTO processed_msg_partitions (node_id, cluster_partition, ts_partition) VALUES (?, ?, ?) USING TTL ?";
|
||||
PreparedStatement prepared = prepare(insert);
|
||||
BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl);
|
||||
BoundStatement boundStatement = prepared.bind(nodeId, clusterPartition, tsPartition, partitionsTtl);
|
||||
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
|
||||
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {
|
||||
String select = "SELECT partition FROM processed_msg_partitions WHERE " +
|
||||
"node_id = ? AND clustered_hash = ?";
|
||||
String select = "SELECT ts_partition FROM processed_msg_partitions WHERE " +
|
||||
"node_id = ? AND cluster_partition = ?";
|
||||
PreparedStatement prepared = prepare(select);
|
||||
BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);
|
||||
Row row = executeRead(boundStatement).one();
|
||||
@ -55,6 +55,6 @@ public class CassandraProcessedPartitionRepository extends SimpleAbstractCassand
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(row.getLong("partition"));
|
||||
return Optional.of(row.getLong("ts_partition"));
|
||||
}
|
||||
}
|
||||
@ -616,11 +616,11 @@ AND compaction = { 'class' : 'LeveledCompactionStrategy' };
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
ts bigint,
|
||||
msg blob,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), ts))
|
||||
PRIMARY KEY ((node_id, cluster_partition, ts_partition), ts))
|
||||
WITH CLUSTERING ORDER BY (ts DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
@ -633,10 +633,10 @@ AND compaction = {
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
msg_id timeuuid,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
|
||||
PRIMARY KEY ((node_id, cluster_partition, ts_partition), msg_id))
|
||||
WITH CLUSTERING ORDER BY (msg_id DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
@ -649,10 +649,10 @@ AND compaction = {
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
PRIMARY KEY ((node_id, clustered_hash), partition))
|
||||
WITH CLUSTERING ORDER BY (partition DESC)
|
||||
cluster_partition bigint,
|
||||
ts_partition bigint,
|
||||
PRIMARY KEY ((node_id, cluster_partition), ts_partition))
|
||||
WITH CLUSTERING ORDER BY (ts_partition DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
'min_threshold': '5',
|
||||
|
||||
@ -26,7 +26,9 @@ import java.util.Arrays;
|
||||
|
||||
@RunWith(ClasspathSuite.class)
|
||||
@ClassnameFilters({
|
||||
"org.thingsboard.server.dao.service.*ServiceNoSqlTest"
|
||||
"org.thingsboard.server.dao.service.*ServiceNoSqlTest",
|
||||
"org.thingsboard.server.dao.service.queue.cassandra.*.*.*Test",
|
||||
"org.thingsboard.server.dao.service.queue.cassandra.*Test"
|
||||
})
|
||||
public class NoSqlDaoServiceTestSuite {
|
||||
|
||||
|
||||
@ -1,19 +1,19 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
|
||||
import org.junit.Before;
|
||||
@ -21,7 +21,7 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,11 +13,11 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,14 +13,17 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
||||
import org.thingsboard.server.dao.service.DaoNoSqlTest;
|
||||
import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -30,15 +33,12 @@ import java.util.concurrent.TimeUnit;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest {
|
||||
@DaoNoSqlTest
|
||||
public class CassandraAckRepositoryTest extends AbstractServiceTest {
|
||||
|
||||
@Autowired
|
||||
private CassandraAckRepository ackRepository;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
ackRepository = new CassandraAckRepository(cassandraUnit.session, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void acksInPartitionCouldBeFound() {
|
||||
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
|
||||
@ -1,19 +1,19 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
//import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@ -21,9 +21,12 @@ import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.rule.engine.api.TbMsgMetaData;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
||||
import org.thingsboard.server.dao.service.DaoNoSqlTest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -33,15 +36,12 @@ import java.util.concurrent.TimeUnit;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
|
||||
@DaoNoSqlTest
|
||||
public class CassandraMsgRepositoryTest extends AbstractServiceTest {
|
||||
|
||||
@Autowired
|
||||
private CassandraMsgRepository msgRepository;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
|
||||
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
|
||||
@ -58,8 +58,6 @@ public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
|
||||
UUID nodeId = UUIDs.timeBased();
|
||||
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
|
||||
future.get();
|
||||
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 2L, 2L);
|
||||
assertEquals(1, msgs.size());
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -13,13 +13,16 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
||||
import org.thingsboard.server.dao.service.DaoNoSqlTest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -29,15 +32,12 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest {
|
||||
@DaoNoSqlTest
|
||||
public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTest {
|
||||
|
||||
@Autowired
|
||||
private CassandraProcessedPartitionRepository partitionRepository;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lastProcessedPartitionCouldBeFound() {
|
||||
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
|
||||
@ -1,2 +1,27 @@
|
||||
TRUNCATE thingsboard.plugin;
|
||||
TRUNCATE thingsboard.rule;
|
||||
TRUNCATE thingsboard.rule;
|
||||
|
||||
-- msg_queue dataset
|
||||
|
||||
INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
|
||||
INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
|
||||
INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
|
||||
|
||||
-- ack_queue dataset
|
||||
INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
|
||||
-- processed partition dataset
|
||||
INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
|
||||
INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
|
||||
INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);
|
||||
@ -1 +1,6 @@
|
||||
database.type=cassandra
|
||||
database.type=cassandra
|
||||
|
||||
cassandra.queue.partitioning=HOURS
|
||||
cassandra.queue.ack.ttl=1
|
||||
cassandra.queue.msg.ttl=1
|
||||
cassandra.queue.partitions.ttl=1
|
||||
13
pom.xml
13
pom.xml
@ -35,7 +35,7 @@
|
||||
<spring-data-redis.version>1.8.10.RELEASE</spring-data-redis.version>
|
||||
<jedis.version>2.9.0</jedis.version>
|
||||
<jjwt.version>0.7.0</jjwt.version>
|
||||
<json-path.version>2.2.0</json-path.version>
|
||||
<json-path.version>2.2.0</json-path.version>
|
||||
<junit.version>4.12</junit.version>
|
||||
<slf4j.version>1.7.7</slf4j.version>
|
||||
<logback.version>1.2.3</logback.version>
|
||||
@ -79,17 +79,19 @@
|
||||
<dbunit.version>2.5.3</dbunit.version>
|
||||
<spring-test-dbunit.version>1.2.1</spring-test-dbunit.version>
|
||||
<postgresql.driver.version>9.4.1211</postgresql.driver.version>
|
||||
<sonar.exclusions>org/thingsboard/server/gen/**/*, org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*</sonar.exclusions>
|
||||
<sonar.exclusions>org/thingsboard/server/gen/**/*,
|
||||
org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
|
||||
</sonar.exclusions>
|
||||
<elasticsearch.version>5.0.2</elasticsearch.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
<module>common</module>
|
||||
<module>rule-engine</module>
|
||||
<module>dao</module>
|
||||
<module>extensions-api</module>
|
||||
<module>extensions-core</module>
|
||||
<module>extensions</module>
|
||||
<module>rule-engine</module>
|
||||
<module>transport</module>
|
||||
<module>ui</module>
|
||||
<module>tools</module>
|
||||
@ -371,6 +373,11 @@
|
||||
<artifactId>data</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.rule-engine</groupId>
|
||||
<artifactId>rule-engine-api</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>message</artifactId>
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
<!--
|
||||
|
||||
Copyright © 2016-2017 The Thingsboard Authors
|
||||
Copyright © 2016-2018 The Thingsboard Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -20,10 +20,9 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
<version>1.4.1-SNAPSHOT</version>
|
||||
<artifactId>thingsboard</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<artifactId>rule-engine</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
|
||||
Copyright © 2016-2017 The Thingsboard Authors
|
||||
Copyright © 2016-2018 The Thingsboard Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -22,7 +22,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
<version>1.4.1-SNAPSHOT</version>
|
||||
<artifactId>rule-engine</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.rule-engine</groupId>
|
||||
@ -53,6 +53,11 @@
|
||||
<artifactId>dao</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
|
||||
|
||||
@ -1,37 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 13.01.18.
|
||||
*/
|
||||
@Data
|
||||
public final class TbMsg implements Serializable {
|
||||
|
||||
private final UUID id;
|
||||
private final String type;
|
||||
private final EntityId originator;
|
||||
private final TbMsgMetaData metaData;
|
||||
|
||||
private final byte[] data;
|
||||
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
|
||||
Copyright © 2016-2017 The Thingsboard Authors
|
||||
Copyright © 2016-2018 The Thingsboard Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -22,7 +22,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
<version>1.4.1-SNAPSHOT</version>
|
||||
<artifactId>rule-engine</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.rule-engine</groupId>
|
||||
@ -61,30 +61,11 @@
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.rule-engine</groupId>
|
||||
<artifactId>rule-engine-api</artifactId>
|
||||
<version>1.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-mapping</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-extras</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
@ -126,23 +107,15 @@
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.xolstice.maven.plugins</groupId>
|
||||
<artifactId>protobuf-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<mainClass>org.thingsboard.rule.engine.tool.QueueBenchmark</mainClass>
|
||||
<mainClass>org.thingsboard.server.dao.queue.QueueBenchmark</mainClass>
|
||||
<classifier>boot</classifier>
|
||||
<layout>ZIP</layout>
|
||||
<executable>true</executable>
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -18,12 +18,7 @@ package org.thingsboard.rule.engine.filter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
|
||||
import java.util.List;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.18.
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -20,6 +20,7 @@ import org.thingsboard.rule.engine.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@ -1,109 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.api.TbMsg;
|
||||
import org.thingsboard.rule.engine.api.TbMsgMetaData;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.gen.MsgQueueProtos;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
public class CassandraMsgRepository extends SimpleAbstractCassandraDao implements MsgRepository {
|
||||
|
||||
private final int msqQueueTtl;
|
||||
|
||||
|
||||
public CassandraMsgRepository(Session session, int msqQueueTtl) {
|
||||
super(session);
|
||||
this.msqQueueTtl = msqQueueTtl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs) {
|
||||
String insert = "INSERT INTO msg_queue (node_id, clustered_hash, partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
|
||||
PreparedStatement statement = prepare(insert);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition, msgTs, toBytes(msg), msqQueueTtl);
|
||||
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
|
||||
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition) {
|
||||
String select = "SELECT node_id, clustered_hash, partition, ts, msg FROM msg_queue WHERE " +
|
||||
"node_id = ? AND clustered_hash = ? AND partition = ?";
|
||||
PreparedStatement statement = prepare(select);
|
||||
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
|
||||
ResultSet rows = executeRead(boundStatement);
|
||||
List<TbMsg> msgs = new ArrayList<>();
|
||||
for (Row row : rows) {
|
||||
msgs.add(fromBytes(row.getBytes("msg")));
|
||||
}
|
||||
return msgs;
|
||||
}
|
||||
|
||||
private ByteBuffer toBytes(TbMsg msg) {
|
||||
MsgQueueProtos.TbMsgProto.Builder builder = MsgQueueProtos.TbMsgProto.newBuilder();
|
||||
builder.setId(msg.getId().toString());
|
||||
builder.setType(msg.getType());
|
||||
if (msg.getOriginator() != null) {
|
||||
builder.setEntityType(msg.getOriginator().getEntityType().name());
|
||||
builder.setEntityId(msg.getOriginator().getId().toString());
|
||||
}
|
||||
|
||||
if (msg.getMetaData() != null) {
|
||||
MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
|
||||
metadataBuilder.putAllData(msg.getMetaData().getData());
|
||||
builder.addMetaData(metadataBuilder.build());
|
||||
}
|
||||
|
||||
builder.setData(ByteString.copyFrom(msg.getData()));
|
||||
byte[] bytes = builder.build().toByteArray();
|
||||
return ByteBuffer.wrap(bytes);
|
||||
}
|
||||
|
||||
private TbMsg fromBytes(ByteBuffer buffer) {
|
||||
try {
|
||||
MsgQueueProtos.TbMsgProto proto = MsgQueueProtos.TbMsgProto.parseFrom(buffer.array());
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
if (proto.getMetaDataCount() > 0) {
|
||||
metaData.setData(proto.getMetaData(0).getDataMap());
|
||||
}
|
||||
|
||||
EntityId entityId = null;
|
||||
if (proto.getEntityId() != null) {
|
||||
entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
|
||||
}
|
||||
|
||||
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,77 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public abstract class SimpleAbstractCassandraDao {
|
||||
|
||||
private ConsistencyLevel defaultReadLevel = ConsistencyLevel.QUORUM;
|
||||
private ConsistencyLevel defaultWriteLevel = ConsistencyLevel.QUORUM;
|
||||
private Session session;
|
||||
private Map<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
|
||||
|
||||
public SimpleAbstractCassandraDao(Session session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
protected Session getSession() {
|
||||
return session;
|
||||
}
|
||||
|
||||
protected ResultSet executeRead(Statement statement) {
|
||||
return execute(statement, defaultReadLevel);
|
||||
}
|
||||
|
||||
protected ResultSet executeWrite(Statement statement) {
|
||||
return execute(statement, defaultWriteLevel);
|
||||
}
|
||||
|
||||
protected ResultSetFuture executeAsyncRead(Statement statement) {
|
||||
return executeAsync(statement, defaultReadLevel);
|
||||
}
|
||||
|
||||
protected ResultSetFuture executeAsyncWrite(Statement statement) {
|
||||
return executeAsync(statement, defaultWriteLevel);
|
||||
}
|
||||
|
||||
protected PreparedStatement prepare(String query) {
|
||||
return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
|
||||
}
|
||||
|
||||
private ResultSet execute(Statement statement, ConsistencyLevel level) {
|
||||
log.debug("Execute cassandra statement {}", statement);
|
||||
if (statement.getConsistencyLevel() == null) {
|
||||
statement.setConsistencyLevel(level);
|
||||
}
|
||||
return getSession().execute(statement);
|
||||
}
|
||||
|
||||
private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
|
||||
log.debug("Execute cassandra async statement {}", statement);
|
||||
if (statement.getConsistencyLevel() == null) {
|
||||
statement.setConsistencyLevel(level);
|
||||
}
|
||||
return getSession().executeAsync(statement);
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -1,48 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
|
||||
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
|
||||
|
||||
public class CassandraMsqQueueTest {
|
||||
|
||||
private CassandraMsqQueue msqQueue;
|
||||
|
||||
@Mock
|
||||
private MsgRepository msgRepository;
|
||||
@Mock
|
||||
private AckRepository ackRepository;
|
||||
@Mock
|
||||
private UnprocessedMsgFilter unprocessedMsgFilter;
|
||||
@Mock
|
||||
private QueuePartitioner queuePartitioner;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
msqQueue = new CassandraMsqQueue(msgRepository, ackRepository, unprocessedMsgFilter, queuePartitioner);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void msgCanBeSaved() {
|
||||
// todo-vp: implement
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -1,30 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
|
||||
|
||||
import org.cassandraunit.CassandraCQLUnit;
|
||||
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
|
||||
public abstract class SimpleAbstractCassandraDaoTest {
|
||||
|
||||
@ClassRule
|
||||
public static CassandraCQLUnit cassandraUnit = new CassandraCQLUnit(
|
||||
new ClassPathCQLDataSet("cassandra/system-test.cql", "thingsboard"));
|
||||
|
||||
|
||||
}
|
||||
@ -1,75 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
ts bigint,
|
||||
msg blob,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), ts))
|
||||
WITH CLUSTERING ORDER BY (ts DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
'min_threshold': '5',
|
||||
'base_time_seconds': '43200',
|
||||
'max_window_size_seconds': '43200',
|
||||
'tombstone_threshold': '0.9',
|
||||
'unchecked_tombstone_compaction': 'true'
|
||||
};
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
msg_id timeuuid,
|
||||
PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
|
||||
WITH CLUSTERING ORDER BY (msg_id DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
'min_threshold': '5',
|
||||
'base_time_seconds': '43200',
|
||||
'max_window_size_seconds': '43200',
|
||||
'tombstone_threshold': '0.9',
|
||||
'unchecked_tombstone_compaction': 'true'
|
||||
};
|
||||
|
||||
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
|
||||
node_id timeuuid,
|
||||
clustered_hash bigint,
|
||||
partition bigint,
|
||||
PRIMARY KEY ((node_id, clustered_hash), partition))
|
||||
WITH CLUSTERING ORDER BY (partition DESC)
|
||||
AND compaction = {
|
||||
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
|
||||
'min_threshold': '5',
|
||||
'base_time_seconds': '43200',
|
||||
'max_window_size_seconds': '43200',
|
||||
'tombstone_threshold': '0.9',
|
||||
'unchecked_tombstone_compaction': 'true'
|
||||
};
|
||||
|
||||
|
||||
|
||||
-- msg_queue dataset
|
||||
|
||||
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
|
||||
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
|
||||
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
|
||||
|
||||
-- ack_queue dataset
|
||||
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
|
||||
|
||||
-- processed partition dataset
|
||||
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
|
||||
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
|
||||
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
|
||||
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);
|
||||
Loading…
x
Reference in New Issue
Block a user