InMemoryStorage extracted
This commit is contained in:
parent
77b962e1bb
commit
d18533a88f
@ -0,0 +1,94 @@
|
|||||||
|
/**
|
||||||
|
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
|
||||||
|
*
|
||||||
|
* Copyright © 2016-2022 ThingsBoard, Inc. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* NOTICE: All information contained herein is, and remains
|
||||||
|
* the property of ThingsBoard, Inc. and its suppliers,
|
||||||
|
* if any. The intellectual and technical concepts contained
|
||||||
|
* herein are proprietary to ThingsBoard, Inc.
|
||||||
|
* and its suppliers and may be covered by U.S. and Foreign Patents,
|
||||||
|
* patents in process, and are protected by trade secret or copyright law.
|
||||||
|
*
|
||||||
|
* Dissemination of this information or reproduction of this material is strictly forbidden
|
||||||
|
* unless prior written permission is obtained from COMPANY.
|
||||||
|
*
|
||||||
|
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
|
||||||
|
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
|
||||||
|
* explicitly covering such access.
|
||||||
|
*
|
||||||
|
* The copyright notice above does not evidence any actual or intended publication
|
||||||
|
* or disclosure of this source code, which includes
|
||||||
|
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
|
||||||
|
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
|
||||||
|
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
|
||||||
|
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
|
||||||
|
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
|
||||||
|
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
|
||||||
|
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
|
||||||
|
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.queue.memory;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.thingsboard.server.queue.TbQueueMsg;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public final class DefaultInMemoryStorage implements InMemoryStorage {
|
||||||
|
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void printStats() {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
storage.forEach((topic, queue) -> {
|
||||||
|
if (queue.size() > 0) {
|
||||||
|
log.debug("[{}] Queue Size [{}]", topic, queue.size());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getLagTotal() {
|
||||||
|
return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean put(String topic, TbQueueMsg msg) {
|
||||||
|
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException {
|
||||||
|
if (storage.containsKey(topic)) {
|
||||||
|
List<T> entities;
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
T first = (T) storage.get(topic).poll();
|
||||||
|
if (first != null) {
|
||||||
|
entities = new ArrayList<>();
|
||||||
|
entities.add(first);
|
||||||
|
List<TbQueueMsg> otherList = new ArrayList<>();
|
||||||
|
storage.get(topic).drainTo(otherList, 999);
|
||||||
|
for (TbQueueMsg other : otherList) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
T entity = (T) other;
|
||||||
|
entities.add(entity);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entities = Collections.emptyList();
|
||||||
|
}
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -15,59 +15,18 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.queue.memory;
|
package org.thingsboard.server.queue.memory;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.thingsboard.server.queue.TbQueueMsg;
|
import org.thingsboard.server.queue.TbQueueMsg;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
@Component
|
public interface InMemoryStorage {
|
||||||
@Slf4j
|
|
||||||
public final class InMemoryStorage {
|
|
||||||
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public void printStats() {
|
void printStats();
|
||||||
storage.forEach((topic, queue) -> {
|
|
||||||
if (queue.size() > 0) {
|
|
||||||
log.debug("[{}] Queue Size [{}]", topic, queue.size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getLagTotal() {
|
int getLagTotal();
|
||||||
return storage.values().stream().map(BlockingQueue::size).reduce(0, Integer::sum);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean put(String topic, TbQueueMsg msg) {
|
boolean put(String topic, TbQueueMsg msg);
|
||||||
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException {
|
<T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException;
|
||||||
if (storage.containsKey(topic)) {
|
|
||||||
List<T> entities;
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
T first = (T) storage.get(topic).poll();
|
|
||||||
if (first != null) {
|
|
||||||
entities = new ArrayList<>();
|
|
||||||
entities.add(first);
|
|
||||||
List<TbQueueMsg> otherList = new ArrayList<>();
|
|
||||||
storage.get(topic).drainTo(otherList, 999);
|
|
||||||
for (TbQueueMsg other : otherList) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
T entity = (T) other;
|
|
||||||
entities.add(entity);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
entities = Collections.emptyList();
|
|
||||||
}
|
|
||||||
return entities;
|
|
||||||
}
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,17 +15,15 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.queue.memory;
|
package org.thingsboard.server.queue.memory;
|
||||||
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.thingsboard.server.queue.TbQueueMsg;
|
import org.thingsboard.server.queue.TbQueueMsg;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class InMemoryStorageTest {
|
public class DefaultInMemoryStorageTest {
|
||||||
|
|
||||||
InMemoryStorage storage = new InMemoryStorage();
|
InMemoryStorage storage = new DefaultInMemoryStorage();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException {
|
public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException {
|
||||||
Loading…
x
Reference in New Issue
Block a user