Cassandra MsqQueue initial implementation

This commit is contained in:
vparomskiy 2018-03-02 13:27:59 +02:00
parent 1da32793e1
commit 3b5a4941ff
25 changed files with 1267 additions and 80 deletions

View File

@ -251,6 +251,9 @@ spring:
username: "${SPRING_DATASOURCE_USERNAME:sa}"
password: "${SPRING_DATASOURCE_PASSWORD:}"
rule:
queue:
msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
# PostgreSQL DAO Configuration
#spring:

View File

@ -555,48 +555,45 @@ CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
partition bigint,
ts bigint,
msg blob,
PRIMARY KEY ((node_id, cluster_hash, partition), ts)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200'
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true',
};
);
PRIMARY KEY ((node_id, clustered_hash, partition), ts))
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
ts bigint,
msg_id timeuuid,
PRIMARY KEY ((node_id, cluster_hash, partition), ts)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200'
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true',
};
);
PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
WITH CLUSTERING ORDER BY (msg_id DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
PRIMARY KEY ((node_id, cluster_hash), partition)
WITH CLUSTERING ORDER BY (partition DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200'
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true',
};
);
PRIMARY KEY ((node_id, clustered_hash), partition))
WITH CLUSTERING ORDER BY (partition DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};

View File

@ -19,6 +19,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by ashvayka on 13.01.18.
@ -26,7 +27,7 @@ import java.util.Map;
@Data
public final class TbMsgMetaData implements Serializable {
private Map<String, String> data;
private Map<String, String> data = new ConcurrentHashMap<>();
public String getValue(String key) {
return data.get(key);

View File

@ -63,5 +63,107 @@
<artifactId>rule-engine-api</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>RELEASE</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-web</artifactId>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>org.thingsboard.rule.engine.tool.QueueBenchmark</mainClass>
<classifier>boot</classifier>
<layout>ZIP</layout>
<executable>true</executable>
<excludeDevtools>true</excludeDevtools>
<!--<embeddedLaunchScriptProperties>-->
<!--<confFolder>${pkg.installFolder}/conf</confFolder>-->
<!--<logFolder>${pkg.unixLogFolder}</logFolder>-->
<!--<logFilename>${pkg.name}.out</logFilename>-->
<!--<initInfoProvides>${pkg.name}</initInfoProvides>-->
<!--</embeddedLaunchScriptProperties>-->
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -15,68 +15,65 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MsqQueue;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
import org.thingsboard.server.common.data.UUIDConverter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Component
@Slf4j
public class CassandraMsqQueue implements MsqQueue {
@Autowired
private MsgRepository msgRepository;
private final MsgRepository msgRepository;
private final AckRepository ackRepository;
private final UnprocessedMsgFilter unprocessedMsgFilter;
private final QueuePartitioner queuePartitioner;
@Autowired
private AckRepository ackRepository;
public CassandraMsqQueue(MsgRepository msgRepository, AckRepository ackRepository,
UnprocessedMsgFilter unprocessedMsgFilter, QueuePartitioner queuePartitioner) {
this.msgRepository = msgRepository;
this.ackRepository = ackRepository;
this.unprocessedMsgFilter = unprocessedMsgFilter;
this.queuePartitioner = queuePartitioner;
}
@Autowired
private AckBuilder ackBuilder;
@Autowired
private UnprocessedMsgFilter unprocessedMsgFilter;
@Autowired
private ProcessedPartitionRepository processedPartitionRepository;
@Override
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash) {
return msgRepository.save(msg, nodeId, clusteredHash, getPartition(msg));
long msgTime = getMsgTime(msg);
long partition = queuePartitioner.getPartition(msgTime);
return msgRepository.save(msg, nodeId, clusteredHash, partition, msgTime);
}
@Override
public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash) {
MsgAck ack = ackBuilder.build(msg, nodeId, clusteredHash);
long partition = queuePartitioner.getPartition(getMsgTime(msg));
MsgAck ack = new MsgAck(msg.getId(), nodeId, clusteredHash, partition);
return ackRepository.ack(ack);
}
@Override
public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash) {
List<TbMsg> unprocessedMsgs = Lists.newArrayList();
for (Long partition : findUnprocessedPartitions(nodeId, clusteredHash)) {
Iterable<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
Iterable<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
for (Long partition : queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash)) {
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
List<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
}
return unprocessedMsgs;
}
private List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
Optional<Long> lastPartition = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
return Collections.emptyList();
}
private long getPartition(TbMsg msg) {
return Long.MIN_VALUE;
private long getMsgTime(TbMsg msg) {
return UUIDs.unixTimestamp(msg.getId());
}
}

View File

@ -15,17 +15,21 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.server.common.data.UUIDConverter;
import java.util.UUID;
@Data
@EqualsAndHashCode
public class MsgAck {
private final UUID msgId;
private final UUID nodeId;
private final long clusteredHash;
private final long partition;
private final long ts;
}

View File

@ -0,0 +1,83 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
import org.thingsboard.server.dao.timeseries.TsPartitionDate;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Component
@Slf4j
public class QueuePartitioner {
private ProcessedPartitionRepository processedPartitionRepository;
private final TsPartitionDate tsFormat;
private Clock clock = Clock.systemUTC();
public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning,
ProcessedPartitionRepository processedPartitionRepository) {
this.processedPartitionRepository = processedPartitionRepository;
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
} else {
log.warn("Incorrect configuration of partitioning {}", "MINUTES");
throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!");
}
}
public long getPartition(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
List<Long> unprocessedPartitions = Lists.newArrayList();
LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)
.plus(1L, tsFormat.getTruncateUnit());
while (current.isBefore(end)) {
current = current.plus(1L, tsFormat.getTruncateUnit());
unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli());
}
return unprocessedPartitions;
}
public void setClock(Clock clock) {
this.clock = clock;
}
public void checkProcessedPartitions() {
//todo-vp: we need to implement this
}
}

View File

@ -15,14 +15,20 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.TbMsg;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
public class UnprocessedMsgFilter {
public Collection<TbMsg> filter(Iterable<TbMsg> msgs, Iterable<MsgAck> acks) {
return Collections.emptyList();
public Collection<TbMsg> filter(List<TbMsg> msgs, List<MsgAck> acks) {
Set<UUID> processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet());
return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList());
}
}

View File

@ -18,11 +18,12 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
import java.util.List;
import java.util.UUID;
public interface AckRepository {
ListenableFuture<Void> ack(MsgAck msgAck);
Iterable<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
}

View File

@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbMsg;
import java.util.List;
import java.util.UUID;
public interface MsgRepository {
ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition);
ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs);
Iterable<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
}

View File

@ -1,11 +1,28 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
import java.util.UUID;
public interface ProcessedPartitionRepository {
void partitionProcessed(UUID nodeId, long clusteredHash, long partition);
ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition);
Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);

View File

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Component
public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository {
private final int ackQueueTtl;
public CassandraAckRepository(Session session, int ackQueueTtl) {
super(session);
this.ackQueueTtl = ackQueueTtl;
}
@Override
public ListenableFuture<Void> ack(MsgAck msgAck) {
String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
PreparedStatement statement = prepare(insert);
BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(),
msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl);
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
}
@Override
public List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition) {
String select = "SELECT msg_id FROM msg_ack_queue WHERE " +
"node_id = ? AND clustered_hash = ? AND partition = ?";
PreparedStatement statement = prepare(select);
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
ResultSet rows = executeRead(boundStatement);
List<MsgAck> msgs = new ArrayList<>();
for (Row row : rows) {
msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition));
}
return msgs;
}
}

View File

@ -0,0 +1,109 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.rule.engine.api.TbMsgMetaData;
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
import org.thingsboard.rule.engine.queue.cassandra.repository.gen.MsgQueueProtos;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Component
public class CassandraMsgRepository extends SimpleAbstractCassandraDao implements MsgRepository {
private final int msqQueueTtl;
public CassandraMsgRepository(Session session, int msqQueueTtl) {
super(session);
this.msqQueueTtl = msqQueueTtl;
}
@Override
public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs) {
String insert = "INSERT INTO msg_queue (node_id, clustered_hash, partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
PreparedStatement statement = prepare(insert);
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition, msgTs, toBytes(msg), msqQueueTtl);
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
}
@Override
public List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition) {
String select = "SELECT node_id, clustered_hash, partition, ts, msg FROM msg_queue WHERE " +
"node_id = ? AND clustered_hash = ? AND partition = ?";
PreparedStatement statement = prepare(select);
BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
ResultSet rows = executeRead(boundStatement);
List<TbMsg> msgs = new ArrayList<>();
for (Row row : rows) {
msgs.add(fromBytes(row.getBytes("msg")));
}
return msgs;
}
private ByteBuffer toBytes(TbMsg msg) {
MsgQueueProtos.TbMsgProto.Builder builder = MsgQueueProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString());
builder.setType(msg.getType());
if (msg.getOriginator() != null) {
builder.setEntityType(msg.getOriginator().getEntityType().name());
builder.setEntityId(msg.getOriginator().getId().toString());
}
if (msg.getMetaData() != null) {
MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
metadataBuilder.putAllData(msg.getMetaData().getData());
builder.addMetaData(metadataBuilder.build());
}
builder.setData(ByteString.copyFrom(msg.getData()));
byte[] bytes = builder.build().toByteArray();
return ByteBuffer.wrap(bytes);
}
private TbMsg fromBytes(ByteBuffer buffer) {
try {
MsgQueueProtos.TbMsgProto proto = MsgQueueProtos.TbMsgProto.parseFrom(buffer.array());
TbMsgMetaData metaData = new TbMsgMetaData();
if (proto.getMetaDataCount() > 0) {
metaData.setData(proto.getMetaData(0).getDataMap());
}
EntityId entityId = null;
if (proto.getEntityId() != null) {
entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
}
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
import java.util.Optional;
import java.util.UUID;
@Component
public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository {
private final int repositoryTtl;
public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) {
super(session);
this.repositoryTtl = repositoryTtl;
}
@Override
public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition) {
String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?";
PreparedStatement prepared = prepare(insert);
BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl);
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
}
@Override
public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {
String select = "SELECT partition FROM processed_msg_partitions WHERE " +
"node_id = ? AND clustered_hash = ?";
PreparedStatement prepared = prepare(select);
BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);
Row row = executeRead(boundStatement).one();
if (row == null) {
return Optional.empty();
}
return Optional.of(row.getLong("partition"));
}
}

View File

@ -0,0 +1,77 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public abstract class SimpleAbstractCassandraDao {
private ConsistencyLevel defaultReadLevel = ConsistencyLevel.QUORUM;
private ConsistencyLevel defaultWriteLevel = ConsistencyLevel.QUORUM;
private Session session;
private Map<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
public SimpleAbstractCassandraDao(Session session) {
this.session = session;
}
protected Session getSession() {
return session;
}
protected ResultSet executeRead(Statement statement) {
return execute(statement, defaultReadLevel);
}
protected ResultSet executeWrite(Statement statement) {
return execute(statement, defaultWriteLevel);
}
protected ResultSetFuture executeAsyncRead(Statement statement) {
return executeAsync(statement, defaultReadLevel);
}
protected ResultSetFuture executeAsyncWrite(Statement statement) {
return executeAsync(statement, defaultWriteLevel);
}
protected PreparedStatement prepare(String query) {
return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
}
private ResultSet execute(Statement statement, ConsistencyLevel level) {
log.debug("Execute cassandra statement {}", statement);
if (statement.getConsistencyLevel() == null) {
statement.setConsistencyLevel(level);
}
return getSession().execute(statement);
}
private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
log.debug("Execute cassandra async statement {}", statement);
if (statement.getConsistencyLevel() == null) {
statement.setConsistencyLevel(level);
}
return getSession().executeAsync(statement);
}
}

View File

@ -0,0 +1,136 @@
package org.thingsboard.rule.engine.tool;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.thingsboard.rule.engine.api.MsqQueue;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.rule.engine.api.TbMsgMetaData;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan({"org.thingsboard.rule.engine"})
//@PropertySource("classpath:processing-pipeline.properties")
@Slf4j
public class QueueBenchmark implements CommandLineRunner {
public static void main(String[] args) {
try {
SpringApplication.run(QueueBenchmark.class, args);
} catch (Throwable th) {
th.printStackTrace();
System.exit(0);
}
}
@Autowired
private MsqQueue msqQueue;
@Override
public void run(String... strings) throws Exception {
System.out.println("It works + " + msqQueue);
long start = System.currentTimeMillis();
int msgCount = 10000000;
AtomicLong count = new AtomicLong(0);
ExecutorService service = Executors.newFixedThreadPool(100);
CountDownLatch latch = new CountDownLatch(msgCount);
for (int i = 0; i < msgCount; i++) {
service.submit(() -> {
boolean isFinished = false;
while (!isFinished) {
try {
TbMsg msg = randomMsg();
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);
// ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L);
Futures.addCallback(put, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
// t.printStackTrace();
System.out.println("onFailure, because:" + t.getMessage());
latch.countDown();
}
});
isFinished = true;
} catch (Throwable th) {
// th.printStackTrace();
System.out.println("Repeat query, because:" + th.getMessage());
// latch.countDown();
}
}
});
}
long prev = 0L;
while (latch.getCount() != 0) {
TimeUnit.SECONDS.sleep(1);
long curr = latch.getCount();
long rps = prev - curr;
prev = curr;
System.out.println("rps = " + rps);
}
long end = System.currentTimeMillis();
System.out.println("final rps = " + (msgCount / (end - start) * 1000));
System.out.println("Finished");
}
private TbMsg randomMsg() {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes());
}
@Bean
public Session session() {
Cluster thingsboard = Cluster.builder()
.addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042))
.withClusterName("thingsboard")
// .withSocketOptions(socketOpts.getOpts())
.withPoolingOptions(new PoolingOptions()
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build();
Session session = thingsboard.connect("thingsboard");
return session;
}
@Bean
public int defaultTtl() {
return 6000;
}
}

View File

@ -13,17 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra;
syntax = "proto3";
package msgqueue;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.TbMsg;
option java_package = "org.thingsboard.rule.engine.queue.cassandra.repository.gen";
option java_outer_classname = "MsgQueueProtos";
import java.util.UUID;
@Component
public class AckBuilder {
message TbMsgProto {
string id = 1;
string type = 2;
string entityType = 3;
string entityId = 4;
public MsgAck build(TbMsg msg, UUID nodeId, long clusteredHash) {
return null;
message TbMsgMetaDataProto {
map<string, string> data = 1;
}
}
repeated TbMsgMetaDataProto metaData = 5;
bytes data = 6;
}

View File

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
public class CassandraMsqQueueTest {
private CassandraMsqQueue msqQueue;
@Mock
private MsgRepository msgRepository;
@Mock
private AckRepository ackRepository;
@Mock
private UnprocessedMsgFilter unprocessedMsgFilter;
@Mock
private QueuePartitioner queuePartitioner;
@Before
public void init() {
msqQueue = new CassandraMsqQueue(msgRepository, ackRepository, unprocessedMsgFilter, queuePartitioner);
}
@Test
public void msgCanBeSaved() {
// todo-vp: implement
}
}

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class QueuePartitionerTest {
private QueuePartitioner queuePartitioner;
@Mock
private ProcessedPartitionRepository partitionRepo;
private Instant startInstant;
private Instant endInstant;
@Before
public void init() {
queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo);
startInstant = Instant.now();
endInstant = startInstant.plus(2, ChronoUnit.MINUTES);
queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC));
}
@Test
public void partitionCalculated() {
long time = 1519390191425L;
long partition = queuePartitioner.getPartition(time);
assertEquals(1519390140000L, partition);
}
@Test
public void unprocessedPartitionsReturned() {
UUID nodeId = UUID.randomUUID();
long clusteredHash = 101L;
when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli()));
List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
assertEquals(3, actual.size());
}
@Test
public void defaultShiftUsedIfNoPartitionWasProcessed() {
UUID nodeId = UUID.randomUUID();
long clusteredHash = 101L;
when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
assertEquals(1011, actual.size());
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.thingsboard.rule.engine.api.TbMsg;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class UnprocessedMsgFilterTest {
private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter();
@Test
public void acknowledgedMsgsAreFilteredOut() {
UUID id1 = UUID.randomUUID();
UUID id2 = UUID.randomUUID();
TbMsg msg1 = new TbMsg(id1, "T", null, null, null);
TbMsg msg2 = new TbMsg(id2, "T", null, null, null);
List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
assertEquals(1, actual.size());
assertEquals(msg1, actual.iterator().next());
}
}

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest {
private CassandraAckRepository ackRepository;
@Before
public void init() {
ackRepository = new CassandraAckRepository(cassandraUnit.session, 1);
}
@Test
public void acksInPartitionCouldBeFound() {
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
List<MsgAck> extectedAcks = Lists.newArrayList(
new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L),
new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L)
);
List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 101L, 300L);
assertEquals(extectedAcks, actualAcks);
}
@Test
public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException {
UUID msgId = UUIDs.timeBased();
UUID nodeId = UUIDs.timeBased();
MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L);
ListenableFuture<Void> future = ackRepository.ack(ack);
future.get();
List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 10L, 20L);
assertEquals(1, actualAcks.size());
assertEquals(ack, actualAcks.get(0));
}
@Test
public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
UUID msgId = UUIDs.timeBased();
UUID nodeId = UUIDs.timeBased();
MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
ListenableFuture<Void> future = ackRepository.ack(ack);
future.get();
List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 30L, 40L);
assertEquals(1, actualAcks.size());
TimeUnit.SECONDS.sleep(2);
assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty());
}
}

View File

@ -0,0 +1,82 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
//import static org.junit.jupiter.api.Assertions.*;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.rule.engine.api.TbMsgMetaData;
import org.thingsboard.server.common.data.id.DeviceId;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
private CassandraMsgRepository msgRepository;
@Before
public void init() {
msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1);
}
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
assertEquals(1, msgs.size());
}
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 2L, 2L);
assertEquals(1, msgs.size());
TimeUnit.SECONDS.sleep(2);
assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());
}
@Test
public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes());
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
assertEquals(1, msgs.size());
assertEquals(msg, msgs.get(0));
}
}

View File

@ -0,0 +1,80 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest {
private CassandraProcessedPartitionRepository partitionRepository;
@Before
public void init() {
partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1);
}
@Test
public void lastProcessedPartitionCouldBeFound() {
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
Optional<Long> lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L);
assertTrue(lastProcessedPartition.isPresent());
assertEquals((Long) 777L, lastProcessedPartition.get());
}
@Test
public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException {
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L);
ListenableFuture<Void> future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L);
ListenableFuture<Void> future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L);
ListenableFuture<List<Void>> allFutures = Futures.allAsList(future1, future2, future3);
allFutures.get();
Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 303L);
assertTrue(actual.isPresent());
assertEquals((Long) 200L, actual.get());
}
@Test
public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
future.get();
Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 404L);
assertEquals((Long) 10L, actual.get());
TimeUnit.SECONDS.sleep(2);
assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent());
}
@Test
public void ifNoPartitionsWereProcessedEmptyResultReturned() {
UUID nodeId = UUIDs.timeBased();
Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 505L);
assertFalse(actual.isPresent());
}
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
import org.cassandraunit.CassandraCQLUnit;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.ClassRule;
public abstract class SimpleAbstractCassandraDaoTest {
@ClassRule
public static CassandraCQLUnit cassandraUnit = new CassandraCQLUnit(
new ClassPathCQLDataSet("cassandra/system-test.cql", "thingsboard"));
}

View File

@ -0,0 +1,75 @@
CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
ts bigint,
msg blob,
PRIMARY KEY ((node_id, clustered_hash, partition), ts))
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
msg_id timeuuid,
PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
WITH CLUSTERING ORDER BY (msg_id DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
PRIMARY KEY ((node_id, clustered_hash), partition))
WITH CLUSTERING ORDER BY (partition DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
'base_time_seconds': '43200',
'max_window_size_seconds': '43200',
'tombstone_threshold': '0.9',
'unchecked_tombstone_compaction': 'true'
};
-- msg_queue dataset
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
-- ack_queue dataset
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
-- processed partition dataset
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);