From 2638d1eaf7122927e04ec0bfd23e75a7c2563aa6 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Fri, 19 Oct 2018 13:39:46 +0300 Subject: [PATCH] Remove Msg queue. Fix tests. --- .../actors/shared/ComponentMsgProcessor.java | 1 - .../service/queue/DefaultMsgQueueService.java | 111 ------------- .../server/service/queue/MsgQueueService.java | 32 ---- .../src/main/resources/thingsboard.yml | 13 -- .../AbstractRuleEngineControllerTest.java | 5 - ...AbstractRuleEngineFlowIntegrationTest.java | 9 -- ...actRuleEngineLifecycleIntegrationTest.java | 69 -------- .../server/system/BaseHttpDeviceApiTest.java | 2 +- .../server/dao/queue/MsgQueue.java | 34 ---- .../server/dao/queue/QueueBenchmark.java | 152 ------------------ .../dao/queue/memory/InMemoryMsgKey.java | 29 ---- .../dao/queue/memory/InMemoryMsgQueue.java | 123 -------------- dao/src/test/resources/nosql-test.properties | 5 - 13 files changed, 1 insertion(+), 584 deletions(-) delete mode 100644 application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java delete mode 100644 application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgKey.java delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java index 2c40be6fba..c9dc307451 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java @@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.service.queue.MsgQueueService; import javax.annotation.Nullable; import java.util.function.Consumer; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java deleted file mode 100644 index 416a4d6b0f..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.queue; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.transport.quota.tenant.TenantQuotaService; -import org.thingsboard.server.dao.queue.MsgQueue; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -@Service -@Slf4j -public class DefaultMsgQueueService implements MsgQueueService { - - @Value("${actors.rule.queue.max_size}") - private long queueMaxSize; - - @Value("${actors.rule.queue.cleanup_period}") - private long queueCleanUpPeriod; - - @Autowired - private MsgQueue msgQueue; - - @Autowired(required = false) - private TenantQuotaService quotaService; - - private ScheduledExecutorService cleanupExecutor; - - private Map pendingCountPerTenant = new ConcurrentHashMap<>(); - - @PostConstruct - public void init() { - if (queueCleanUpPeriod > 0) { - cleanupExecutor = Executors.newSingleThreadScheduledExecutor(); - cleanupExecutor.scheduleAtFixedRate(() -> cleanup(), - queueCleanUpPeriod, queueCleanUpPeriod, TimeUnit.SECONDS); - } - } - - @PreDestroy - public void stop() { - if (cleanupExecutor != null) { - cleanupExecutor.shutdownNow(); - } - } - - @Override - public ListenableFuture put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { - if(quotaService != null && quotaService.isQuotaExceeded(tenantId.getId().toString())) { - log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId()); - return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded")); - } - - AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong()); - if (pendingMsgCount.incrementAndGet() < queueMaxSize) { - return msgQueue.put(tenantId, msg, nodeId, clusterPartition); - } else { - pendingMsgCount.decrementAndGet(); - return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!")); - } - } - - @Override - public ListenableFuture ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { - ListenableFuture result = msgQueue.ack(tenantId, msg, nodeId, clusterPartition); - AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong()); - pendingMsgCount.decrementAndGet(); - return result; - } - - @Override - public Iterable findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) { - return msgQueue.findUnprocessed(tenantId, nodeId, clusterPartition); - } - - private void cleanup() { - pendingCountPerTenant.forEach((tenantId, pendingMsgCount) -> { - pendingMsgCount.set(0); - msgQueue.cleanUp(tenantId); - }); - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java deleted file mode 100644 index 2cf001ba73..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.queue; - -import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; - -import java.util.UUID; - -public interface MsgQueueService { - - ListenableFuture put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition); - - ListenableFuture ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition); - - Iterable findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition); - -} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 90d15003bc..984fa280d6 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -182,12 +182,6 @@ cassandra: permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}" - queue: - msg.ttl: 604800 # 7 days - ack.ttl: 604800 # 7 days - partitions.ttl: 604800 # 7 days - partitioning: "HOURS" - # SQL configuration parameters sql: # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED @@ -221,13 +215,6 @@ actors: node: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" - queue: - # Message queue type - type: "${ACTORS_RULE_QUEUE_TYPE:memory}" - # Message queue maximum size (per tenant) - max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}" - # Message queue cleanup period in seconds - cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java index 1b042a821c..cb31a4b50c 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java @@ -27,9 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; -import org.thingsboard.server.dao.queue.MsgQueue; import org.thingsboard.server.dao.rule.RuleChainService; -import org.thingsboard.server.service.queue.MsgQueueService; import java.io.IOException; import java.util.function.Predicate; @@ -42,9 +40,6 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest { @Autowired protected RuleChainService ruleChainService; - @Autowired - protected MsgQueueService msgQueueService; - protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception { return doPost("/api/ruleChain", ruleChain, RuleChain.class); } diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index c86d49609e..f050f3b505 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -191,9 +191,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText()); Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText()); - - List unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L)); - Assert.assertEquals(0, unAckMsgList.size()); } @Test @@ -311,12 +308,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText()); Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText()); - - List unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L)); - Assert.assertEquals(0, unAckMsgList.size()); - - unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L)); - Assert.assertEquals(0, unAckMsgList.size()); } } diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 7ac0789383..24db4570bd 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -162,73 +162,4 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText()); } - @Test - public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception { - // Creating Rule Chain - RuleChain ruleChain = new RuleChain(); - ruleChain.setName("Simple Rule Chain"); - ruleChain.setTenantId(savedTenant.getId()); - ruleChain.setRoot(true); - ruleChain.setDebugMode(true); - ruleChain = saveRuleChain(ruleChain); - Assert.assertNull(ruleChain.getFirstRuleNodeId()); - - // Saving the device - Device device = new Device(); - device.setName("My device"); - device.setType("default"); - device = doPost("/api/device", device, Device.class); - - attributesService.save(device.getId(), DataConstants.SERVER_SCOPE, - Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis()))); - - // Pushing Message to the system - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), - "CUSTOM", - device.getId(), - new TbMsgMetaData(), - "{}", - ruleChain.getId(), null, 0L); - msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L); - - Thread.sleep(1000); - - RuleChainMetaData metaData = new RuleChainMetaData(); - metaData.setRuleChainId(ruleChain.getId()); - - RuleNode ruleNode = new RuleNode(); - ruleNode.setName("Simple Rule Node"); - ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); - ruleNode.setDebugMode(true); - TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration(); - configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey")); - ruleNode.setConfiguration(mapper.valueToTree(configuration)); - - metaData.setNodes(Collections.singletonList(ruleNode)); - metaData.setFirstNodeIndex(0); - - metaData = saveRuleChainMetaData(metaData); - Assert.assertNotNull(metaData); - - ruleChain = getRuleChain(ruleChain.getId()); - Assert.assertNotNull(ruleChain.getFirstRuleNodeId()); - - Thread.sleep(3000); - - TimePageData eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); - List events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); - - Assert.assertEquals(2, events.size()); - - Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); - Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); - Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); - - Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); - Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); - Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); - - Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText()); - } - } diff --git a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java index 5e6896b2b7..1ebccadfea 100644 --- a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java +++ b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java @@ -57,7 +57,7 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest { @Test public void testGetAttributes() throws Exception { doGetAsync("/api/v1/" + "WRONG_TOKEN" + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isUnauthorized()); - doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isNotFound()); + doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk()); Map attrMap = new HashMap<>(); attrMap.put("keyA", "valueA"); diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java deleted file mode 100644 index 19021eb95c..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.queue; - -import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; - -import java.util.UUID; - -public interface MsgQueue { - - ListenableFuture put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition); - - ListenableFuture ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition); - - Iterable findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition); - - ListenableFuture cleanUp(TenantId tenantId); - -} diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java deleted file mode 100644 index ca61a63a4c..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.queue; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.utils.UUIDs; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.context.annotation.Bean; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.RuleChainId; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; -import org.thingsboard.server.common.msg.TbMsgMetaData; - -import javax.annotation.Nullable; -import java.net.InetSocketAddress; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -//@SpringBootApplication -//@EnableAutoConfiguration -//@ComponentScan({"org.thingsboard.rule.engine"}) -//@PropertySource("classpath:processing-pipeline.properties") -@Slf4j -public class QueueBenchmark implements CommandLineRunner { - - public static void main(String[] args) { - try { - SpringApplication.run(QueueBenchmark.class, args); - } catch (Throwable th) { - th.printStackTrace(); - System.exit(0); - } - } - - @Autowired - private MsgQueue msgQueue; - - @Override - public void run(String... strings) throws Exception { - System.out.println("It works + " + msgQueue); - - - long start = System.currentTimeMillis(); - int msgCount = 10000000; - AtomicLong count = new AtomicLong(0); - ExecutorService service = Executors.newFixedThreadPool(100); - - CountDownLatch latch = new CountDownLatch(msgCount); - for (int i = 0; i < msgCount; i++) { - service.submit(() -> { - boolean isFinished = false; - while (!isFinished) { - try { - TbMsg msg = randomMsg(); - UUID nodeId = UUIDs.timeBased(); - ListenableFuture put = msgQueue.put(new TenantId(EntityId.NULL_UUID), msg, nodeId, 100L); -// ListenableFuture put = msgQueue.ack(msg, nodeId, 100L); - Futures.addCallback(put, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - latch.countDown(); - } - - @Override - public void onFailure(Throwable t) { -// t.printStackTrace(); - System.out.println("onFailure, because:" + t.getMessage()); - latch.countDown(); - } - }); - isFinished = true; - } catch (Throwable th) { -// th.printStackTrace(); - System.out.println("Repeat query, because:" + th.getMessage()); -// latch.countDown(); - } - } - }); - } - - long prev = 0L; - while (latch.getCount() != 0) { - TimeUnit.SECONDS.sleep(1); - long curr = latch.getCount(); - long rps = prev - curr; - prev = curr; - System.out.println("rps = " + rps); - } - - long end = System.currentTimeMillis(); - System.out.println("final rps = " + (msgCount / (end - start) * 1000)); - - System.out.println("Finished"); - - } - - private TbMsg randomMsg() { - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("key", "value"); - String dataStr = "someContent"; - return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr, new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L); - } - - @Bean - public Session session() { - Cluster thingsboard = Cluster.builder() - .addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042)) - .withClusterName("thingsboard") -// .withSocketOptions(socketOpts.getOpts()) - .withPoolingOptions(new PoolingOptions() - .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) - .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build(); - - Session session = thingsboard.connect("thingsboard"); - return session; - } - - @Bean - public int defaultTtl() { - return 6000; - } - -} diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgKey.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgKey.java deleted file mode 100644 index bb44a2b192..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgKey.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.queue.memory; - -import lombok.Data; - -import java.util.UUID; - -/** - * Created by ashvayka on 30.04.18. - */ -@Data -public final class InMemoryMsgKey { - final UUID nodeId; - final long clusterPartition; -} diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java deleted file mode 100644 index 93057784f6..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.queue.memory; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.dao.queue.MsgQueue; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; - -/** - * Created by ashvayka on 27.04.18. - */ -@Component -@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) -@Slf4j -public class InMemoryMsgQueue implements MsgQueue { - - private ListeningExecutorService queueExecutor; - private Map>> data = new HashMap<>(); - - @PostConstruct - public void init() { - // Should be always single threaded due to absence of locks. - queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - } - - @PreDestroy - public void stop() { - if (queueExecutor != null) { - queueExecutor.shutdownNow(); - } - } - - @Override - public ListenableFuture put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { - return queueExecutor.submit(() -> { - data.computeIfAbsent(tenantId, key -> new HashMap<>()). - computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg); - return null; - }); - } - - @Override - public ListenableFuture ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { - return queueExecutor.submit(() -> { - Map> tenantMap = data.get(tenantId); - if (tenantMap != null) { - InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition); - Map map = tenantMap.get(key); - if (map != null) { - map.remove(msg.getId()); - if (map.isEmpty()) { - tenantMap.remove(key); - } - } - if (tenantMap.isEmpty()) { - data.remove(tenantId); - } - } - return null; - }); - } - - @Override - public Iterable findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) { - ListenableFuture> list = queueExecutor.submit(() -> { - Map> tenantMap = data.get(tenantId); - if (tenantMap != null) { - InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition); - Map map = tenantMap.get(key); - if (map != null) { - return new ArrayList<>(map.values()); - } else { - return Collections.emptyList(); - } - } else { - return Collections.emptyList(); - } - }); - try { - return list.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - @Override - public ListenableFuture cleanUp(TenantId tenantId) { - return queueExecutor.submit(() -> { - data.remove(tenantId); - return null; - }); - } -} diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties index 06a92faa01..7c3ec51e8a 100644 --- a/dao/src/test/resources/nosql-test.properties +++ b/dao/src/test/resources/nosql-test.properties @@ -1,7 +1,2 @@ database.entities.type=cassandra database.ts.type=cassandra - -cassandra.queue.partitioning=HOURS -cassandra.queue.ack.ttl=3600 -cassandra.queue.msg.ttl=3600 -cassandra.queue.partitions.ttl=3600 \ No newline at end of file