acks = ackRepository.findAcks(nodeId, clusterPartition, tsPartition);
unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
}
return unprocessedMsgs;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/MsgAck.java
similarity index 70%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/MsgAck.java
index c6885ad41b..fed6e856eb 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/MsgAck.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,13 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra;
+package org.thingsboard.server.dao.service.queue.cassandra;
-import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import org.thingsboard.rule.engine.api.TbMsg;
-import org.thingsboard.server.common.data.UUIDConverter;
import java.util.UUID;
@@ -29,7 +26,7 @@ public class MsgAck {
private final UUID msgId;
private final UUID nodeId;
- private final long clusteredHash;
- private final long partition;
+ private final long clusteredPartition;
+ private final long tsPartition;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
similarity index 84%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
index 9a7886f54c..faad701d4c 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
@@ -1,26 +1,27 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra;
+package org.thingsboard.server.dao.service.queue.cassandra;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
import org.thingsboard.server.dao.timeseries.TsPartitionDate;
+import org.thingsboard.server.dao.util.NoSqlDao;
import java.time.Clock;
import java.time.Instant;
@@ -32,26 +33,27 @@ import java.util.UUID;
@Component
@Slf4j
+@NoSqlDao
public class QueuePartitioner {
- private ProcessedPartitionRepository processedPartitionRepository;
-
private final TsPartitionDate tsFormat;
+ private ProcessedPartitionRepository processedPartitionRepository;
private Clock clock = Clock.systemUTC();
- public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning,
+ public QueuePartitioner(@Value("${cassandra.queue.partitioning}") String partitioning,
ProcessedPartitionRepository processedPartitionRepository) {
this.processedPartitionRepository = processedPartitionRepository;
Optional partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
} else {
- log.warn("Incorrect configuration of partitioning {}", "MINUTES");
- throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!");
+ log.warn("Incorrect configuration of partitioning {}", partitioning);
+ throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
}
}
public long getPartition(long ts) {
+ //TODO: use TsPartitionDate.truncateTo?
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilter.java
similarity index 87%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilter.java
index e114a8507d..4dcd351763 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilter.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra;
+package org.thingsboard.server.dao.service.queue.cassandra;
import org.springframework.stereotype.Component;
-import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.server.common.msg.TbMsg;
import java.util.Collection;
import java.util.List;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/AckRepository.java
similarity index 73%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/AckRepository.java
index 40c0416957..d7cdb0cc06 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/AckRepository.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository;
+package org.thingsboard.server.dao.service.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
-import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
import java.util.List;
import java.util.UUID;
@@ -25,5 +25,5 @@ public interface AckRepository {
ListenableFuture ack(MsgAck msgAck);
- List findAcks(UUID nodeId, long clusteredHash, long partition);
+ List findAcks(UUID nodeId, long clusterPartition, long tsPartition);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/MsgRepository.java
similarity index 72%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/MsgRepository.java
index 5d34d84c3b..d54f1af59f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/MsgRepository.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,18 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository;
+package org.thingsboard.server.dao.service.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
-import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
import java.util.UUID;
public interface MsgRepository {
- ListenableFuture save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs);
+ ListenableFuture save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs);
- List findMsgs(UUID nodeId, long clusteredHash, long partition);
+ List findMsgs(UUID nodeId, long clusterPartition, long tsPartition);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/ProcessedPartitionRepository.java
similarity index 88%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/ProcessedPartitionRepository.java
index 807c0017c6..a50ab6143e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/ProcessedPartitionRepository.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository;
+package org.thingsboard.server.dao.service.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepository.java
similarity index 60%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepository.java
index 57dc79c49b..1f62f2bf4e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepository.java
@@ -1,62 +1,62 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
-import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
+import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
+import org.thingsboard.server.dao.service.queue.cassandra.repository.AckRepository;
+import org.thingsboard.server.dao.util.NoSqlDao;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Component
-public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository {
+@NoSqlDao
+public class CassandraAckRepository extends CassandraAbstractDao implements AckRepository {
- private final int ackQueueTtl;
-
- public CassandraAckRepository(Session session, int ackQueueTtl) {
- super(session);
- this.ackQueueTtl = ackQueueTtl;
- }
+ @Value("${cassandra.queue.ack.ttl}")
+ private int ackQueueTtl;
@Override
public ListenableFuture ack(MsgAck msgAck) {
- String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
+ String insert = "INSERT INTO msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
PreparedStatement statement = prepare(insert);
- BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(),
- msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl);
+ BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredPartition(),
+ msgAck.getTsPartition(), msgAck.getMsgId(), ackQueueTtl);
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
return Futures.transform(resultSetFuture, (Function) input -> null);
}
@Override
- public List findAcks(UUID nodeId, long clusteredHash, long partition) {
+ public List findAcks(UUID nodeId, long clusterPartition, long tsPartition) {
String select = "SELECT msg_id FROM msg_ack_queue WHERE " +
- "node_id = ? AND clustered_hash = ? AND partition = ?";
+ "node_id = ? AND cluster_partition = ? AND ts_partition = ?";
PreparedStatement statement = prepare(select);
- BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
+ BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);
ResultSet rows = executeRead(boundStatement);
List msgs = new ArrayList<>();
for (Row row : rows) {
- msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition));
+ msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusterPartition, tsPartition));
}
return msgs;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepository.java
new file mode 100644
index 0000000000..2a70a8909a
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepository.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
+import org.thingsboard.server.dao.service.queue.cassandra.repository.MsgRepository;
+import org.thingsboard.server.dao.util.NoSqlDao;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@Component
+@NoSqlDao
+public class CassandraMsgRepository extends CassandraAbstractDao implements MsgRepository {
+
+ @Value("${cassandra.queue.msg.ttl}")
+ private int msqQueueTtl;
+
+ @Override
+ public ListenableFuture save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs) {
+ String insert = "INSERT INTO msg_queue (node_id, cluster_partition, ts_partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
+ PreparedStatement statement = prepare(insert);
+ BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition, msgTs, TbMsg.toBytes(msg), msqQueueTtl);
+ ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+ return Futures.transform(resultSetFuture, (Function) input -> null);
+ }
+
+ @Override
+ public List findMsgs(UUID nodeId, long clusterPartition, long tsPartition) {
+ String select = "SELECT node_id, cluster_partition, ts_partition, ts, msg FROM msg_queue WHERE " +
+ "node_id = ? AND cluster_partition = ? AND ts_partition = ?";
+ PreparedStatement statement = prepare(select);
+ BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);
+ ResultSet rows = executeRead(boundStatement);
+ List msgs = new ArrayList<>();
+ for (Row row : rows) {
+ msgs.add(TbMsg.fromBytes(row.getBytes("msg")));
+ }
+ return msgs;
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
similarity index 61%
rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
rename to dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
index 7fc15d8207..b0eacfa1b3 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
@@ -1,53 +1,53 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
import com.datastax.driver.core.*;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
+import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.util.NoSqlDao;
import java.util.Optional;
import java.util.UUID;
@Component
-public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository {
+@NoSqlDao
+public class CassandraProcessedPartitionRepository extends CassandraAbstractDao implements ProcessedPartitionRepository {
- private final int repositoryTtl;
-
- public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) {
- super(session);
- this.repositoryTtl = repositoryTtl;
- }
+ @Value("${cassandra.queue.partitions.ttl}")
+ private int partitionsTtl;
@Override
- public ListenableFuture partitionProcessed(UUID nodeId, long clusteredHash, long partition) {
- String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?";
+ public ListenableFuture partitionProcessed(UUID nodeId, long clusterPartition, long tsPartition) {
+ String insert = "INSERT INTO processed_msg_partitions (node_id, cluster_partition, ts_partition) VALUES (?, ?, ?) USING TTL ?";
PreparedStatement prepared = prepare(insert);
- BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl);
+ BoundStatement boundStatement = prepared.bind(nodeId, clusterPartition, tsPartition, partitionsTtl);
ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
return Futures.transform(resultSetFuture, (Function) input -> null);
}
@Override
public Optional findLastProcessedPartition(UUID nodeId, long clusteredHash) {
- String select = "SELECT partition FROM processed_msg_partitions WHERE " +
- "node_id = ? AND clustered_hash = ?";
+ String select = "SELECT ts_partition FROM processed_msg_partitions WHERE " +
+ "node_id = ? AND cluster_partition = ?";
PreparedStatement prepared = prepare(select);
BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);
Row row = executeRead(boundStatement).one();
@@ -55,6 +55,6 @@ public class CassandraProcessedPartitionRepository extends SimpleAbstractCassand
return Optional.empty();
}
- return Optional.of(row.getLong("partition"));
+ return Optional.of(row.getLong("ts_partition"));
}
}
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 6c62c89395..0ccf2badf8 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -616,11 +616,11 @@ AND compaction = { 'class' : 'LeveledCompactionStrategy' };
CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
node_id timeuuid,
- clustered_hash bigint,
- partition bigint,
+ cluster_partition bigint,
+ ts_partition bigint,
ts bigint,
msg blob,
- PRIMARY KEY ((node_id, clustered_hash, partition), ts))
+ PRIMARY KEY ((node_id, cluster_partition, ts_partition), ts))
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
@@ -633,10 +633,10 @@ AND compaction = {
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
node_id timeuuid,
- clustered_hash bigint,
- partition bigint,
+ cluster_partition bigint,
+ ts_partition bigint,
msg_id timeuuid,
- PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
+ PRIMARY KEY ((node_id, cluster_partition, ts_partition), msg_id))
WITH CLUSTERING ORDER BY (msg_id DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
@@ -649,10 +649,10 @@ AND compaction = {
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
node_id timeuuid,
- clustered_hash bigint,
- partition bigint,
- PRIMARY KEY ((node_id, clustered_hash), partition))
-WITH CLUSTERING ORDER BY (partition DESC)
+ cluster_partition bigint,
+ ts_partition bigint,
+ PRIMARY KEY ((node_id, cluster_partition), ts_partition))
+WITH CLUSTERING ORDER BY (ts_partition DESC)
AND compaction = {
'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
'min_threshold': '5',
diff --git a/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java
index 7e25baa45e..5e01249fb5 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/NoSqlDaoServiceTestSuite.java
@@ -26,7 +26,9 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClassnameFilters({
- "org.thingsboard.server.dao.service.*ServiceNoSqlTest"
+ "org.thingsboard.server.dao.service.*ServiceNoSqlTest",
+ "org.thingsboard.server.dao.service.queue.cassandra.*.*.*Test",
+ "org.thingsboard.server.dao.service.queue.cassandra.*Test"
})
public class NoSqlDaoServiceTestSuite {
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
similarity index 90%
rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
rename to dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
index a71a737f74..a6ec464e6b 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
@@ -1,19 +1,19 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra;
+package org.thingsboard.server.dao.service.queue.cassandra;
import org.junit.Before;
@@ -21,7 +21,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.service.queue.cassandra.repository.ProcessedPartitionRepository;
import java.time.Clock;
import java.time.Instant;
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
similarity index 90%
rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
rename to dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
index dec8e35b2b..046b462064 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra;
+package org.thingsboard.server.dao.service.queue.cassandra;
import com.google.common.collect.Lists;
import org.junit.Test;
-import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.server.common.msg.TbMsg;
import java.util.Collection;
import java.util.List;
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
similarity index 84%
rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
rename to dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
index 38b7b9de8f..a86238e3b1 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,14 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
-import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.server.dao.service.AbstractServiceTest;
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
import java.util.List;
import java.util.UUID;
@@ -30,15 +33,12 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest {
+@DaoNoSqlTest
+public class CassandraAckRepositoryTest extends AbstractServiceTest {
+ @Autowired
private CassandraAckRepository ackRepository;
- @Before
- public void init() {
- ackRepository = new CassandraAckRepository(cassandraUnit.session, 1);
- }
-
@Test
public void acksInPartitionCouldBeFound() {
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
similarity index 81%
rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
rename to dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index a0827fc5ac..d17e1f2819 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -1,19 +1,19 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
//import static org.junit.jupiter.api.Assertions.*;
@@ -21,9 +21,12 @@ import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
-import org.thingsboard.rule.engine.api.TbMsg;
-import org.thingsboard.rule.engine.api.TbMsgMetaData;
+import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.dao.service.AbstractServiceTest;
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
import java.util.List;
import java.util.UUID;
@@ -33,15 +36,12 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
+@DaoNoSqlTest
+public class CassandraMsgRepositoryTest extends AbstractServiceTest {
+ @Autowired
private CassandraMsgRepository msgRepository;
- @Before
- public void init() {
- msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1);
- }
-
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
@@ -58,8 +58,6 @@ public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
UUID nodeId = UUIDs.timeBased();
ListenableFuture future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
- List msgs = msgRepository.findMsgs(nodeId, 2L, 2L);
- assertEquals(1, msgs.size());
TimeUnit.SECONDS.sleep(2);
assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());
}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
similarity index 87%
rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
rename to dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
index a731452f24..1b487903b9 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright © 2016-2017 The Thingsboard Authors
+ * Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,13 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+package org.thingsboard.server.dao.service.queue.cassandra.repository.impl;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.server.dao.service.AbstractServiceTest;
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
import java.util.List;
import java.util.Optional;
@@ -29,15 +32,12 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
-public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest {
+@DaoNoSqlTest
+public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTest {
+ @Autowired
private CassandraProcessedPartitionRepository partitionRepository;
- @Before
- public void init() {
- partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1);
- }
-
@Test
public void lastProcessedPartitionCouldBeFound() {
UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
diff --git a/dao/src/test/resources/cassandra/system-test.cql b/dao/src/test/resources/cassandra/system-test.cql
index da5d1f1154..3b05fd71d1 100644
--- a/dao/src/test/resources/cassandra/system-test.cql
+++ b/dao/src/test/resources/cassandra/system-test.cql
@@ -1,2 +1,27 @@
TRUNCATE thingsboard.plugin;
-TRUNCATE thingsboard.rule;
\ No newline at end of file
+TRUNCATE thingsboard.rule;
+
+-- msg_queue dataset
+
+INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
+INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
+INSERT INTO thingsboard.msg_queue (node_id, cluster_partition, ts_partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
+
+-- ack_queue dataset
+INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
+INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
+INSERT INTO thingsboard.msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
+
+-- processed partition dataset
+INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
+INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
+INSERT INTO thingsboard.processed_msg_partitions (node_id, cluster_partition, ts_partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);
\ No newline at end of file
diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties
index 482c6e7ed3..7c02666675 100644
--- a/dao/src/test/resources/nosql-test.properties
+++ b/dao/src/test/resources/nosql-test.properties
@@ -1 +1,6 @@
-database.type=cassandra
\ No newline at end of file
+database.type=cassandra
+
+cassandra.queue.partitioning=HOURS
+cassandra.queue.ack.ttl=1
+cassandra.queue.msg.ttl=1
+cassandra.queue.partitions.ttl=1
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 121644a266..f331e32fb2 100755
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
1.8.10.RELEASE
2.9.0
0.7.0
- 2.2.0
+ 2.2.0
4.12
1.7.7
1.2.3
@@ -79,17 +79,19 @@
2.5.3
1.2.1
9.4.1211
- org/thingsboard/server/gen/**/*, org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
+ org/thingsboard/server/gen/**/*,
+ org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
+
5.0.2
common
+ rule-engine
dao
extensions-api
extensions-core
extensions
- rule-engine
transport
ui
tools
@@ -371,6 +373,11 @@
data
${project.version}
+
+ org.thingsboard.rule-engine
+ rule-engine-api
+ ${project.version}
+
org.thingsboard.common
message
diff --git a/rule-engine/pom.xml b/rule-engine/pom.xml
index e23f8714e3..29b5cdc4ef 100644
--- a/rule-engine/pom.xml
+++ b/rule-engine/pom.xml
@@ -1,6 +1,6 @@