InMemoryStorage refactored from the static singleton to the Spring Bean

This commit is contained in:
Sergey Matvienko 2022-04-14 13:08:01 +03:00
parent 126e8b9639
commit 9a2bc5ab9d
15 changed files with 43 additions and 116 deletions

View File

@ -23,20 +23,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
@Slf4j
public abstract class AbstractInMemoryStorageTest {
@Before
public void setUpInMemoryStorage() {
log.info("set up InMemoryStorage");
cleanupInMemStorage();
}
@After
public void tearDownInMemoryStorage() {
log.info("tear down InMemoryStorage");
cleanupInMemStorage();
}
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -30,9 +30,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class ControllerSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -26,8 +26,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class EdgeSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -27,9 +27,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class RuleEngineSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -75,6 +75,9 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
@Autowired
protected EventService eventService;
@Autowired
protected InMemoryStorage storage;
@Before
public void beforeTest() throws Exception {
@ -159,7 +162,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
await("total inMemory queue lag is empty").atMost(30, TimeUnit.SECONDS)
.until(() -> InMemoryStorage.getInstance().getLagTotal() == 0);
.until(() -> storage.getLagTotal() == 0);
Thread.sleep(1000);
TbMsgCallback tbMsgCallback = Mockito.mock(TbMsgCallback.class);

View File

@ -27,9 +27,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class ServiceSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -29,9 +29,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class SystemSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -40,9 +40,4 @@ public class TransportNoSqlTestSuite {
),
"cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -34,9 +34,4 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
})
public class TransportSqlTestSuite {
@BeforeClass
public static void cleanupInMemStorage() {
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.queue.memory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.TbQueueMsg;
import java.util.ArrayList;
@ -25,14 +26,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@Component
@Slf4j
public final class InMemoryStorage {
private static InMemoryStorage instance;
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
private InMemoryStorage() {
storage = new ConcurrentHashMap<>();
}
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage = new ConcurrentHashMap<>();
public void printStats() {
storage.forEach((topic, queue) -> {
@ -46,17 +43,6 @@ public final class InMemoryStorage {
return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum);
}
public static InMemoryStorage getInstance() {
if (instance == null) {
synchronized (InMemoryStorage.class) {
if (instance == null) {
instance = new InMemoryStorage();
}
}
}
return instance;
}
public boolean put(String topic, TbQueueMsg msg) {
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
}
@ -84,11 +70,4 @@ public final class InMemoryStorage {
return Collections.emptyList();
}
/**
* Used primarily for testing.
*/
public void cleanup() {
storage.clear();
}
}

View File

@ -27,12 +27,13 @@ import java.util.stream.Collectors;
@Slf4j
public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueConsumer<T> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private final InMemoryStorage storage;
private volatile Set<TopicPartitionInfo> partitions;
private volatile boolean stopped;
private volatile boolean subscribed;
public InMemoryTbQueueConsumer(String topic) {
public InMemoryTbQueueConsumer(InMemoryStorage storage, String topic) {
this.storage = storage;
this.topic = topic;
stopped = false;
}

View File

@ -24,11 +24,12 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@Data
public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueueProducer<T> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private final InMemoryStorage storage;
private final String defaultTopic;
public InMemoryTbQueueProducer(String defaultTopic) {
public InMemoryTbQueueProducer(InMemoryStorage storage, String defaultTopic) {
this.storage = storage;
this.defaultTopic = defaultTopic;
}

View File

@ -55,69 +55,70 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
TbQueueRuleEngineSettings ruleEngineSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
TbQueueTransportNotificationSettings transportNotificationSettings,
InMemoryStorage storage) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.storage = InMemoryStorage.getInstance();
this.storage = storage;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new InMemoryTbQueueProducer<>(transportNotificationSettings.getNotificationsTopic());
return new InMemoryTbQueueProducer<>(storage, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new InMemoryTbQueueProducer<>(ruleEngineSettings.getTopic());
return new InMemoryTbQueueProducer<>(storage, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new InMemoryTbQueueProducer<>(ruleEngineSettings.getTopic());
return new InMemoryTbQueueProducer<>(storage, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new InMemoryTbQueueConsumer<>(configuration.getTopic());
return new InMemoryTbQueueConsumer<>(storage, configuration.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new InMemoryTbQueueConsumer<>(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
return new InMemoryTbQueueConsumer<>(storage, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() {
return new InMemoryTbQueueConsumer<>(coreSettings.getTopic());
return new InMemoryTbQueueConsumer<>(storage, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
return new InMemoryTbQueueConsumer<>(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
return new InMemoryTbQueueConsumer<>(storage, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new InMemoryTbQueueConsumer<>(transportApiSettings.getRequestsTopic());
return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new InMemoryTbQueueProducer<>(transportApiSettings.getResponsesTopic());
return new InMemoryTbQueueProducer<>(storage, transportApiSettings.getResponsesTopic());
}
@Override
@ -127,22 +128,22 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new InMemoryTbQueueConsumer<>(coreSettings.getUsageStatsTopic());
return new InMemoryTbQueueConsumer<>(storage, coreSettings.getUsageStatsTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new InMemoryTbQueueConsumer<>(coreSettings.getOtaPackageTopic());
return new InMemoryTbQueueConsumer<>(storage, coreSettings.getOtaPackageTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getOtaPackageTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getOtaPackageTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getUsageStatsTopic());
}
@Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
@ -45,24 +46,27 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final InMemoryStorage storage;
public InMemoryTbTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings) {
TbQueueCoreSettings coreSettings,
InMemoryStorage storage) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.storage = storage;
}
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic());
new InMemoryTbQueueProducer<>(storage, transportApiSettings.getRequestsTopic());
InMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new InMemoryTbQueueConsumer<>(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
@ -85,22 +89,22 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic());
return new InMemoryTbQueueProducer<>(storage, transportApiSettings.getRequestsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() {
return new InMemoryTbQueueConsumer<>(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
return new InMemoryTbQueueConsumer<>(storage, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic());
return new InMemoryTbQueueProducer<>(storage, coreSettings.getUsageStatsTopic());
}
}

View File

@ -25,17 +25,7 @@ import static org.mockito.Mockito.mock;
public class InMemoryStorageTest {
InMemoryStorage storage = InMemoryStorage.getInstance();
@Before
public void setUp() {
storage.cleanup();
}
@After
public void tearDown() {
storage.cleanup();
}
InMemoryStorage storage = new InMemoryStorage();
@Test
public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException {
@ -48,7 +38,5 @@ public class InMemoryStorageTest {
assertThat(storage.getLagTotal()).isEqualTo(3);
storage.get("main");
assertThat(storage.getLagTotal()).isEqualTo(1);
storage.cleanup();
assertThat(storage.getLagTotal()).isEqualTo(0);
}
}