diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 46286362fb..2cd1755698 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -86,13 +86,26 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor ruleNodeList) { + for (RuleNode ruleNode : ruleNodeList) { + for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) { + pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg); + } + } + if (firstNode != null) { + for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) { + pushMsgToNode(firstNode, tbMsg); + } + } + } + @Override public void onUpdate(ActorContext context) throws Exception { RuleChain ruleChain = service.findRuleChainById(entityId); @@ -117,6 +130,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor pushMsgToNode(firstNode, msg)); } else { pushMsgToNode(firstNode, envelope.getMsg()); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d9e9012670..744cc5b9d1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -302,7 +302,9 @@ spring: rule: queue: - msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}" + type: "memory" + max_size: 10000 + # PostgreSQL DAO Configuration #spring: diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 18583fdc7e..0ea6ff4aec 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -159,4 +159,72 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText()); } + @Test + public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception { + // Creating Rule Chain + RuleChain ruleChain = new RuleChain(); + ruleChain.setName("Simple Rule Chain"); + ruleChain.setTenantId(savedTenant.getId()); + ruleChain.setRoot(true); + ruleChain.setDebugMode(true); + ruleChain = saveRuleChain(ruleChain); + Assert.assertNull(ruleChain.getFirstRuleNodeId()); + + // Saving the device + Device device = new Device(); + device.setName("My device"); + device.setType("default"); + device = doPost("/api/device", device, Device.class); + + attributesService.save(device.getId(), DataConstants.SERVER_SCOPE, + Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis()))); + + // Pushing Message to the system + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), + "CUSTOM", + device.getId(), + new TbMsgMetaData(), + "{}", + ruleChain.getId(), null, 0L); + msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L); + + Thread.sleep(1000); + + RuleChainMetaData metaData = new RuleChainMetaData(); + metaData.setRuleChainId(ruleChain.getId()); + + RuleNode ruleNode = new RuleNode(); + ruleNode.setName("Simple Rule Node"); + ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); + ruleNode.setDebugMode(true); + TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration(); + configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey")); + ruleNode.setConfiguration(mapper.valueToTree(configuration)); + + metaData.setNodes(Collections.singletonList(ruleNode)); + metaData.setFirstNodeIndex(0); + + metaData = saveRuleChainMetaData(metaData); + Assert.assertNotNull(metaData); + + ruleChain = getRuleChain(ruleChain.getId()); + Assert.assertNotNull(ruleChain.getFirstRuleNodeId()); + + Thread.sleep(3000); + + TimePageData events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); + + Assert.assertEquals(2, events.getData().size()); + + Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); + Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); + Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); + + Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); + Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); + Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); + + Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText()); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java index faad701d4c..a60f685f29 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java @@ -30,6 +30,7 @@ import java.time.ZoneOffset; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Component @Slf4j @@ -60,7 +61,7 @@ public class QueuePartitioner { public List findUnprocessedPartitions(UUID nodeId, long clusteredHash) { Optional lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash); - long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100); + long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7)); List unprocessedPartitions = Lists.newArrayList(); LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java index dd2316869f..ce579cd00c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java @@ -15,10 +15,14 @@ */ package org.thingsboard.server.dao.sql.queue; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.dao.queue.MsgQueue; @@ -40,13 +44,17 @@ import java.util.concurrent.atomic.AtomicLong; * Created by ashvayka on 27.04.18. */ @Component +//@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) @Slf4j @SqlDao public class InMemoryMsgQueue implements MsgQueue { + @Value("${rule.queue.max_size}") + @Getter + private long maxSize; + private ListeningExecutorService queueExecutor; - //TODO: - private AtomicLong pendingMsgCount; + private AtomicLong pendingMsgCount = new AtomicLong(); private Map> data = new HashMap<>(); @PostConstruct @@ -64,10 +72,15 @@ public class InMemoryMsgQueue implements MsgQueue { @Override public ListenableFuture put(TbMsg msg, UUID nodeId, long clusterPartition) { - return queueExecutor.submit(() -> { - data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg); - return null; - }); + if (pendingMsgCount.get() < maxSize) { + return queueExecutor.submit(() -> { + data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg); + pendingMsgCount.incrementAndGet(); + return null; + }); + } else { + return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!")); + } } @Override @@ -76,14 +89,15 @@ public class InMemoryMsgQueue implements MsgQueue { InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition); Map map = data.get(key); if (map != null) { - map.remove(msg.getId()); + if (map.remove(msg.getId()) != null) { + pendingMsgCount.decrementAndGet(); + } if (map.isEmpty()) { data.remove(key); } } return null; }); - } @Override diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java index a6ec464e6b..2d3b61fec1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java @@ -75,7 +75,7 @@ public class QueuePartitionerTest { long clusteredHash = 101L; when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty()); List actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash); - assertEquals(1011, actual.size()); + assertEquals(10083, actual.size()); } } \ No newline at end of file diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java index a86238e3b1..cff4dc9d0f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.dao.service.AbstractServiceTest; import org.thingsboard.server.dao.service.DaoNoSqlTest; import org.thingsboard.server.dao.service.queue.cassandra.MsgAck; @@ -66,6 +67,7 @@ public class CassandraAckRepositoryTest extends AbstractServiceTest { @Test public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException { + ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1); UUID msgId = UUIDs.timeBased(); UUID nodeId = UUIDs.timeBased(); MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java index ae252ae44a..fa286aaea2 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -58,6 +59,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest { @Test public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException { + ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1); TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000", new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L); UUID nodeId = UUIDs.timeBased(); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java index 1b487903b9..2ae810a221 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.dao.service.AbstractServiceTest; import org.thingsboard.server.dao.service.DaoNoSqlTest; @@ -61,6 +62,7 @@ public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTe @Test public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException { + ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1); UUID nodeId = UUIDs.timeBased(); ListenableFuture future = partitionRepository.partitionProcessed(nodeId, 404L, 10L); future.get(); diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 42b71f84e3..dbd8b84a4b 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -28,3 +28,6 @@ redis.connection.host=localhost redis.connection.port=6379 redis.connection.db=0 redis.connection.password= + +rule.queue.type=memory +rule.queue.max_size=10000 \ No newline at end of file diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties index 7c02666675..e37e228b40 100644 --- a/dao/src/test/resources/nosql-test.properties +++ b/dao/src/test/resources/nosql-test.properties @@ -1,6 +1,6 @@ database.type=cassandra cassandra.queue.partitioning=HOURS -cassandra.queue.ack.ttl=1 -cassandra.queue.msg.ttl=1 -cassandra.queue.partitions.ttl=1 \ No newline at end of file +cassandra.queue.ack.ttl=3600 +cassandra.queue.msg.ttl=3600 +cassandra.queue.partitions.ttl=3600 \ No newline at end of file