From d18533a88f2ee9ecfd92a94fc3f9734b4b649678 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 15 Apr 2022 14:19:56 +0300 Subject: [PATCH] InMemoryStorage extracted --- .../queue/memory/DefaultInMemoryStorage.java | 94 +++++++++++++++++++ .../server/queue/memory/InMemoryStorage.java | 51 +--------- ...t.java => DefaultInMemoryStorageTest.java} | 6 +- 3 files changed, 101 insertions(+), 50 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorage.java rename common/queue/src/test/java/org/thingsboard/server/queue/memory/{InMemoryStorageTest.java => DefaultInMemoryStorageTest.java} (91%) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorage.java new file mode 100644 index 0000000000..f852974dd5 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorage.java @@ -0,0 +1,94 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2022 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.queue.memory; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +@Component +@Slf4j +public final class DefaultInMemoryStorage implements InMemoryStorage { + private final ConcurrentHashMap> storage = new ConcurrentHashMap<>(); + + @Override + public void printStats() { + if (log.isDebugEnabled()) { + storage.forEach((topic, queue) -> { + if (queue.size() > 0) { + log.debug("[{}] Queue Size [{}]", topic, queue.size()); + } + }); + } + } + + @Override + public int getLagTotal() { + return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum); + } + + @Override + public boolean put(String topic, TbQueueMsg msg) { + return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); + } + + @Override + public List get(String topic) throws InterruptedException { + if (storage.containsKey(topic)) { + List entities; + @SuppressWarnings("unchecked") + T first = (T) storage.get(topic).poll(); + if (first != null) { + entities = new ArrayList<>(); + entities.add(first); + List otherList = new ArrayList<>(); + storage.get(topic).drainTo(otherList, 999); + for (TbQueueMsg other : otherList) { + @SuppressWarnings("unchecked") + T entity = (T) other; + entities.add(entity); + } + } else { + entities = Collections.emptyList(); + } + return entities; + } + return Collections.emptyList(); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java index 11b97100ee..0fdfebbfff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java @@ -15,59 +15,18 @@ */ package org.thingsboard.server.queue.memory; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import org.thingsboard.server.queue.TbQueueMsg; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -@Component -@Slf4j -public final class InMemoryStorage { - private final ConcurrentHashMap> storage = new ConcurrentHashMap<>(); +public interface InMemoryStorage { - public void printStats() { - storage.forEach((topic, queue) -> { - if (queue.size() > 0) { - log.debug("[{}] Queue Size [{}]", topic, queue.size()); - } - }); - } + void printStats(); - public int getLagTotal() { - return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum); - } + int getLagTotal(); - public boolean put(String topic, TbQueueMsg msg) { - return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); - } + boolean put(String topic, TbQueueMsg msg); - public List get(String topic) throws InterruptedException { - if (storage.containsKey(topic)) { - List entities; - @SuppressWarnings("unchecked") - T first = (T) storage.get(topic).poll(); - if (first != null) { - entities = new ArrayList<>(); - entities.add(first); - List otherList = new ArrayList<>(); - storage.get(topic).drainTo(otherList, 999); - for (TbQueueMsg other : otherList) { - @SuppressWarnings("unchecked") - T entity = (T) other; - entities.add(entity); - } - } else { - entities = Collections.emptyList(); - } - return entities; - } - return Collections.emptyList(); - } + List get(String topic) throws InterruptedException; } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorageTest.java similarity index 91% rename from common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java rename to common/queue/src/test/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorageTest.java index d8911c3fd9..bd4f238447 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/memory/InMemoryStorageTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/memory/DefaultInMemoryStorageTest.java @@ -15,17 +15,15 @@ */ package org.thingsboard.server.queue.memory; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.thingsboard.server.queue.TbQueueMsg; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; -public class InMemoryStorageTest { +public class DefaultInMemoryStorageTest { - InMemoryStorage storage = new InMemoryStorage(); + InMemoryStorage storage = new DefaultInMemoryStorage(); @Test public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException {