diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractInMemoryStorageTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractInMemoryStorageTest.java index 59f7714233..efa9d9f08a 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractInMemoryStorageTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractInMemoryStorageTest.java @@ -23,20 +23,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; @Slf4j public abstract class AbstractInMemoryStorageTest { - @Before - public void setUpInMemoryStorage() { - log.info("set up InMemoryStorage"); - cleanupInMemStorage(); - } - - @After - public void tearDownInMemoryStorage() { - log.info("tear down InMemoryStorage"); - cleanupInMemStorage(); - } - - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java index ddc0c09157..9dcba19044 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java @@ -30,9 +30,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class ControllerSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/edge/EdgeSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/edge/EdgeSqlTestSuite.java index 70894cdbbe..e7fff461f3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/EdgeSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/edge/EdgeSqlTestSuite.java @@ -26,8 +26,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class EdgeSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } } diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java index 36e08f5859..cb99c21165 100644 --- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java @@ -27,9 +27,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class RuleEngineSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 8011cbe952..9f82b6ca2b 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -75,6 +75,9 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @Autowired protected EventService eventService; + @Autowired + protected InMemoryStorage storage; + @Before public void beforeTest() throws Exception { @@ -159,7 +162,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis()))); await("total inMemory queue lag is empty").atMost(30, TimeUnit.SECONDS) - .until(() -> InMemoryStorage.getInstance().getLagTotal() == 0); + .until(() -> storage.getLagTotal() == 0); Thread.sleep(1000); TbMsgCallback tbMsgCallback = Mockito.mock(TbMsgCallback.class); diff --git a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java index bc3348c9f1..11f06875fe 100644 --- a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java @@ -27,9 +27,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class ServiceSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java index 695e90fd33..714ad67519 100644 --- a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java @@ -29,9 +29,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class SystemSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/transport/TransportNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/transport/TransportNoSqlTestSuite.java index dc74fc7655..255057cc2f 100644 --- a/application/src/test/java/org/thingsboard/server/transport/TransportNoSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/transport/TransportNoSqlTestSuite.java @@ -40,9 +40,4 @@ public class TransportNoSqlTestSuite { ), "cassandra-test.yaml", 30000l); - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java index 26fc5acdaa..be416b3a69 100644 --- a/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java @@ -34,9 +34,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; }) public class TransportSqlTestSuite { - @BeforeClass - public static void cleanupInMemStorage() { - InMemoryStorage.getInstance().cleanup(); - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java index 8bf72d4de6..11b97100ee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.memory; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; import org.thingsboard.server.queue.TbQueueMsg; import java.util.ArrayList; @@ -25,14 +26,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +@Component @Slf4j public final class InMemoryStorage { - private static InMemoryStorage instance; - private final ConcurrentHashMap> storage; - - private InMemoryStorage() { - storage = new ConcurrentHashMap<>(); - } + private final ConcurrentHashMap> storage = new ConcurrentHashMap<>(); public void printStats() { storage.forEach((topic, queue) -> { @@ -46,17 +43,6 @@ public final class InMemoryStorage { return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum); } - public static InMemoryStorage getInstance() { - if (instance == null) { - synchronized (InMemoryStorage.class) { - if (instance == null) { - instance = new InMemoryStorage(); - } - } - } - return instance; - } - public boolean put(String topic, TbQueueMsg msg) { return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); } @@ -84,11 +70,4 @@ public final class InMemoryStorage { return Collections.emptyList(); } - /** - * Used primarily for testing. - */ - public void cleanup() { - storage.clear(); - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index de96729e0b..76ce8f6572 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -27,12 +27,13 @@ import java.util.stream.Collectors; @Slf4j public class InMemoryTbQueueConsumer implements TbQueueConsumer { - private final InMemoryStorage storage = InMemoryStorage.getInstance(); + private final InMemoryStorage storage; private volatile Set partitions; private volatile boolean stopped; private volatile boolean subscribed; - public InMemoryTbQueueConsumer(String topic) { + public InMemoryTbQueueConsumer(InMemoryStorage storage, String topic) { + this.storage = storage; this.topic = topic; stopped = false; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java index 5d3562c00d..ca305ed43d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java @@ -24,11 +24,12 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @Data public class InMemoryTbQueueProducer implements TbQueueProducer { - private final InMemoryStorage storage = InMemoryStorage.getInstance(); + private final InMemoryStorage storage; private final String defaultTopic; - public InMemoryTbQueueProducer(String defaultTopic) { + public InMemoryTbQueueProducer(InMemoryStorage storage, String defaultTopic) { + this.storage = storage; this.defaultTopic = defaultTopic; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index 0548603cb7..661688b9e4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -55,69 +55,70 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbQueueTransportApiSettings transportApiSettings, - TbQueueTransportNotificationSettings transportNotificationSettings) { + TbQueueTransportNotificationSettings transportNotificationSettings, + InMemoryStorage storage) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; - this.storage = InMemoryStorage.getInstance(); + this.storage = storage; } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new InMemoryTbQueueProducer<>(transportNotificationSettings.getNotificationsTopic()); + return new InMemoryTbQueueProducer<>(storage, transportNotificationSettings.getNotificationsTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new InMemoryTbQueueProducer<>(ruleEngineSettings.getTopic()); + return new InMemoryTbQueueProducer<>(storage, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new InMemoryTbQueueProducer<>(ruleEngineSettings.getTopic()); + return new InMemoryTbQueueProducer<>(storage, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new InMemoryTbQueueConsumer<>(configuration.getTopic()); + return new InMemoryTbQueueConsumer<>(storage, configuration.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new InMemoryTbQueueConsumer<>(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); + return new InMemoryTbQueueConsumer<>(storage, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new InMemoryTbQueueConsumer<>(coreSettings.getTopic()); + return new InMemoryTbQueueConsumer<>(storage, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new InMemoryTbQueueConsumer<>(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); + return new InMemoryTbQueueConsumer<>(storage, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new InMemoryTbQueueConsumer<>(transportApiSettings.getRequestsTopic()); + return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic()); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new InMemoryTbQueueProducer<>(transportApiSettings.getResponsesTopic()); + return new InMemoryTbQueueProducer<>(storage, transportApiSettings.getResponsesTopic()); } @Override @@ -127,22 +128,22 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @Override public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { - return new InMemoryTbQueueConsumer<>(coreSettings.getUsageStatsTopic()); + return new InMemoryTbQueueConsumer<>(storage, coreSettings.getUsageStatsTopic()); } @Override public TbQueueConsumer> createToOtaPackageStateServiceMsgConsumer() { - return new InMemoryTbQueueConsumer<>(coreSettings.getOtaPackageTopic()); + return new InMemoryTbQueueConsumer<>(storage, coreSettings.getOtaPackageTopic()); } @Override public TbQueueProducer> createToOtaPackageStateServiceMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getOtaPackageTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getOtaPackageTopic()); } @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getUsageStatsTopic()); } @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index cb60272ace..26bea214b8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -31,6 +31,7 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; @@ -45,24 +46,27 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; + private final InMemoryStorage storage; public InMemoryTbTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceInfoProvider serviceInfoProvider, - TbQueueCoreSettings coreSettings) { + TbQueueCoreSettings coreSettings, + InMemoryStorage storage) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; + this.storage = storage; } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { InMemoryTbQueueProducer> producerTemplate = - new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic()); + new InMemoryTbQueueProducer<>(storage, transportApiSettings.getRequestsTopic()); InMemoryTbQueueConsumer> consumerTemplate = - new InMemoryTbQueueConsumer<>(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); + new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); @@ -85,22 +89,22 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic()); + return new InMemoryTbQueueProducer<>(storage, transportApiSettings.getRequestsTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic()); } @Override public TbQueueConsumer> createTransportNotificationsConsumer() { - return new InMemoryTbQueueConsumer<>(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); + return new InMemoryTbQueueConsumer<>(storage, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); } @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { - return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic()); + return new InMemoryTbQueueProducer<>(storage, coreSettings.getUsageStatsTopic()); } } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java index 84858786b4..d8911c3fd9 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java @@ -25,17 +25,7 @@ import static org.mockito.Mockito.mock; public class InMemoryStorageTest { - InMemoryStorage storage = InMemoryStorage.getInstance(); - - @Before - public void setUp() { - storage.cleanup(); - } - - @After - public void tearDown() { - storage.cleanup(); - } + InMemoryStorage storage = new InMemoryStorage(); @Test public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException { @@ -48,7 +38,5 @@ public class InMemoryStorageTest { assertThat(storage.getLagTotal()).isEqualTo(3); storage.get("main"); assertThat(storage.getLagTotal()).isEqualTo(1); - storage.cleanup(); - assertThat(storage.getLagTotal()).isEqualTo(0); } } \ No newline at end of file