Merge with master
This commit is contained in:
commit
75fd37428a
@ -221,12 +221,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SessionMetaData implements SendHandler {
|
class SessionMetaData implements SendHandler {
|
||||||
private final WebSocketSession session;
|
private final WebSocketSession session;
|
||||||
private final RemoteEndpoint.Async asyncRemote;
|
private final RemoteEndpoint.Async asyncRemote;
|
||||||
private final WebSocketSessionRef sessionRef;
|
private final WebSocketSessionRef sessionRef;
|
||||||
|
|
||||||
private final AtomicBoolean isSending = new AtomicBoolean(false);
|
final AtomicBoolean isSending = new AtomicBoolean(false);
|
||||||
private final Queue<TbWebSocketMsg<?>> msgQueue;
|
private final Queue<TbWebSocketMsg<?>> msgQueue;
|
||||||
|
|
||||||
private volatile long lastActivityTime;
|
private volatile long lastActivityTime;
|
||||||
@ -241,7 +241,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
this.lastActivityTime = System.currentTimeMillis();
|
this.lastActivityTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void sendPing(long currentTime) {
|
void sendPing(long currentTime) {
|
||||||
try {
|
try {
|
||||||
long timeSinceLastActivity = currentTime - lastActivityTime;
|
long timeSinceLastActivity = currentTime - lastActivityTime;
|
||||||
if (timeSinceLastActivity >= pingTimeout) {
|
if (timeSinceLastActivity >= pingTimeout) {
|
||||||
@ -256,26 +256,25 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeSession(CloseStatus reason) {
|
void closeSession(CloseStatus reason) {
|
||||||
try {
|
try {
|
||||||
close(this.sessionRef, reason);
|
close(this.sessionRef, reason);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
log.trace("[{}] Session transport error", session.getId(), ioe);
|
||||||
|
} finally {
|
||||||
|
msgQueue.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void processPongMessage(long currentTime) {
|
void processPongMessage(long currentTime) {
|
||||||
lastActivityTime = currentTime;
|
lastActivityTime = currentTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void sendMsg(String msg) {
|
void sendMsg(String msg) {
|
||||||
sendMsg(new TbWebSocketTextMsg(msg));
|
sendMsg(new TbWebSocketTextMsg(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void sendMsg(TbWebSocketMsg<?> msg) {
|
void sendMsg(TbWebSocketMsg<?> msg) {
|
||||||
if (isSending.compareAndSet(false, true)) {
|
|
||||||
sendMsgInternal(msg);
|
|
||||||
} else {
|
|
||||||
try {
|
try {
|
||||||
msgQueue.add(msg);
|
msgQueue.add(msg);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
@ -285,8 +284,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId());
|
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId());
|
||||||
}
|
}
|
||||||
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
processNextMsg();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMsgInternal(TbWebSocketMsg<?> msg) {
|
private void sendMsgInternal(TbWebSocketMsg<?> msg) {
|
||||||
@ -294,9 +295,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
if (TbWebSocketMsgType.TEXT.equals(msg.getType())) {
|
if (TbWebSocketMsgType.TEXT.equals(msg.getType())) {
|
||||||
TbWebSocketTextMsg textMsg = (TbWebSocketTextMsg) msg;
|
TbWebSocketTextMsg textMsg = (TbWebSocketTextMsg) msg;
|
||||||
this.asyncRemote.sendText(textMsg.getMsg(), this);
|
this.asyncRemote.sendText(textMsg.getMsg(), this);
|
||||||
|
// isSending status will be reset in the onResult method by call back
|
||||||
} else {
|
} else {
|
||||||
TbWebSocketPingMsg pingMsg = (TbWebSocketPingMsg) msg;
|
TbWebSocketPingMsg pingMsg = (TbWebSocketPingMsg) msg;
|
||||||
this.asyncRemote.sendPing(pingMsg.getMsg());
|
this.asyncRemote.sendPing(pingMsg.getMsg()); // blocking call
|
||||||
|
isSending.set(false);
|
||||||
processNextMsg();
|
processNextMsg();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -310,12 +313,17 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
if (!result.isOK()) {
|
if (!result.isOK()) {
|
||||||
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
|
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
|
||||||
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
} else {
|
return;
|
||||||
processNextMsg();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isSending.set(false);
|
||||||
|
processNextMsg();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processNextMsg() {
|
private void processNextMsg() {
|
||||||
|
if (msgQueue.isEmpty() || !isSending.compareAndSet(false, true)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
TbWebSocketMsg<?> msg = msgQueue.poll();
|
TbWebSocketMsg<?> msg = msgQueue.poll();
|
||||||
if (msg != null) {
|
if (msg != null) {
|
||||||
sendMsgInternal(msg);
|
sendMsgInternal(msg);
|
||||||
@ -392,57 +400,66 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
if (tenantProfileConfiguration == null) {
|
if (tenantProfileConfiguration == null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
boolean limitAllowed;
|
||||||
String sessionId = session.getId();
|
String sessionId = session.getId();
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
|
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
|
||||||
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
||||||
synchronized (tenantSessions) {
|
synchronized (tenantSessions) {
|
||||||
if (tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant()) {
|
limitAllowed = tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant();
|
||||||
|
if (limitAllowed) {
|
||||||
tenantSessions.add(sessionId);
|
tenantSessions.add(sessionId);
|
||||||
} else {
|
}
|
||||||
|
}
|
||||||
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
|
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
|
||||||
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
||||||
synchronized (customerSessions) {
|
synchronized (customerSessions) {
|
||||||
if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer()) {
|
limitAllowed = customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer();
|
||||||
|
if (limitAllowed) {
|
||||||
customerSessions.add(sessionId);
|
customerSessions.add(sessionId);
|
||||||
} else {
|
}
|
||||||
|
}
|
||||||
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
|
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
|
||||||
&& UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
&& UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||||
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||||
synchronized (regularUserSessions) {
|
synchronized (regularUserSessions) {
|
||||||
if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser()) {
|
limitAllowed = regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser();
|
||||||
|
if (limitAllowed) {
|
||||||
regularUserSessions.add(sessionId);
|
regularUserSessions.add(sessionId);
|
||||||
} else {
|
}
|
||||||
|
}
|
||||||
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
|
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
|
||||||
&& UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
&& UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||||
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||||
synchronized (publicUserSessions) {
|
synchronized (publicUserSessions) {
|
||||||
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser()) {
|
limitAllowed = publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser();
|
||||||
|
if (limitAllowed) {
|
||||||
publicUserSessions.add(sessionId);
|
publicUserSessions.add(sessionId);
|
||||||
} else {
|
}
|
||||||
|
}
|
||||||
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
||||||
@ -450,7 +467,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -250,7 +250,8 @@ cassandra:
|
|||||||
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
|
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
|
||||||
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
|
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
|
||||||
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
|
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
|
||||||
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
|
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" # Buffered rate executor (read, write), for managing I/O rate. See "nosql-*-callback" threads in JMX
|
||||||
|
result_processing_threads: "${CASSANDRA_QUERY_RESULT_PROCESSING_THREADS:50}" # Result set transformer and processing. See "cassandra-callback" threads in JMX
|
||||||
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
|
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
|
||||||
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
|
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
|
||||||
# set all data types values except target to null for the same ts on save
|
# set all data types values except target to null for the same ts on save
|
||||||
|
|||||||
@ -0,0 +1,160 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.controller.plugin;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.springframework.web.socket.CloseStatus;
|
||||||
|
import org.springframework.web.socket.adapter.NativeWebSocketSession;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.server.service.ws.WebSocketSessionRef;
|
||||||
|
|
||||||
|
import javax.websocket.RemoteEndpoint;
|
||||||
|
import javax.websocket.SendHandler;
|
||||||
|
import javax.websocket.SendResult;
|
||||||
|
import javax.websocket.Session;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.BDDMockito.willAnswer;
|
||||||
|
import static org.mockito.BDDMockito.willDoNothing;
|
||||||
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class TbWebSocketHandlerTest {
|
||||||
|
|
||||||
|
TbWebSocketHandler wsHandler;
|
||||||
|
NativeWebSocketSession session;
|
||||||
|
Session nativeSession;
|
||||||
|
RemoteEndpoint.Async asyncRemote;
|
||||||
|
WebSocketSessionRef sessionRef;
|
||||||
|
int maxMsgQueuePerSession;
|
||||||
|
TbWebSocketHandler.SessionMetaData sendHandler;
|
||||||
|
ExecutorService executor;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws IOException {
|
||||||
|
maxMsgQueuePerSession = 100;
|
||||||
|
executor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||||
|
wsHandler = spy(new TbWebSocketHandler());
|
||||||
|
willDoNothing().given(wsHandler).close(any(), any());
|
||||||
|
session = mock(NativeWebSocketSession.class);
|
||||||
|
nativeSession = mock(Session.class);
|
||||||
|
willReturn(nativeSession).given(session).getNativeSession(Session.class);
|
||||||
|
asyncRemote = mock(RemoteEndpoint.Async.class);
|
||||||
|
willReturn(asyncRemote).given(nativeSession).getAsyncRemote();
|
||||||
|
sessionRef = mock(WebSocketSessionRef.class, Mockito.RETURNS_DEEP_STUBS); //prevent NPE on logs
|
||||||
|
sendHandler = spy(wsHandler.new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void tearDown() {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void sendHandler_sendMsg_parallel_no_race() throws InterruptedException {
|
||||||
|
CountDownLatch finishLatch = new CountDownLatch(maxMsgQueuePerSession * 2);
|
||||||
|
AtomicInteger sendersCount = new AtomicInteger();
|
||||||
|
willAnswer(invocation -> {
|
||||||
|
assertThat(sendersCount.incrementAndGet()).as("no race").isEqualTo(1);
|
||||||
|
String text = invocation.getArgument(0);
|
||||||
|
SendHandler onResultHandler = invocation.getArgument(1);
|
||||||
|
SendResult sendResult = new SendResult();
|
||||||
|
executor.submit(() -> {
|
||||||
|
sendersCount.decrementAndGet();
|
||||||
|
onResultHandler.onResult(sendResult);
|
||||||
|
finishLatch.countDown();
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}).given(asyncRemote).sendText(anyString(), any());
|
||||||
|
|
||||||
|
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse();
|
||||||
|
//first batch
|
||||||
|
IntStream.range(0, maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i));
|
||||||
|
Awaitility.await("first batch processed").atMost(30, TimeUnit.SECONDS).until(() -> finishLatch.getCount() == maxMsgQueuePerSession);
|
||||||
|
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse();
|
||||||
|
//second batch - to test pause between big msg batches
|
||||||
|
IntStream.range(100, 100 + maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i));
|
||||||
|
assertThat(finishLatch.await(30, TimeUnit.SECONDS)).as("all callbacks fired").isTrue();
|
||||||
|
|
||||||
|
verify(sendHandler, never()).closeSession(any());
|
||||||
|
verify(sendHandler, times(maxMsgQueuePerSession * 2)).onResult(any());
|
||||||
|
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void sendHandler_sendMsg_message_order() throws InterruptedException {
|
||||||
|
CountDownLatch finishLatch = new CountDownLatch(maxMsgQueuePerSession);
|
||||||
|
Collection<String> outputs = new ConcurrentLinkedQueue<>();
|
||||||
|
willAnswer(invocation -> {
|
||||||
|
String text = invocation.getArgument(0);
|
||||||
|
outputs.add(text);
|
||||||
|
SendHandler onResultHandler = invocation.getArgument(1);
|
||||||
|
SendResult sendResult = new SendResult();
|
||||||
|
executor.submit(() -> {
|
||||||
|
onResultHandler.onResult(sendResult);
|
||||||
|
finishLatch.countDown();
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}).given(asyncRemote).sendText(anyString(), any());
|
||||||
|
|
||||||
|
List<String> inputs = IntStream.range(0, maxMsgQueuePerSession).mapToObj(i -> "msg " + i).collect(Collectors.toList());
|
||||||
|
inputs.forEach(s -> sendHandler.sendMsg(s));
|
||||||
|
|
||||||
|
assertThat(finishLatch.await(30, TimeUnit.SECONDS)).as("all callbacks fired").isTrue();
|
||||||
|
assertThat(outputs).as("inputs exactly the same as outputs").containsExactlyElementsOf(inputs);
|
||||||
|
|
||||||
|
verify(sendHandler, never()).closeSession(any());
|
||||||
|
verify(sendHandler, times(maxMsgQueuePerSession)).onResult(any());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void sendHandler_sendMsg_queue_size_exceed() {
|
||||||
|
willDoNothing().given(asyncRemote).sendText(anyString(), any()); // send text will never call back, so queue will grow each sendMsg
|
||||||
|
sendHandler.sendMsg("first message to stay in-flight all the time during this test");
|
||||||
|
IntStream.range(0, maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i));
|
||||||
|
verify(sendHandler, never()).closeSession(any());
|
||||||
|
sendHandler.sendMsg("excessive message");
|
||||||
|
verify(sendHandler, times(1)).closeSession(eq(new CloseStatus(1008, "Max pending updates limit reached!")));
|
||||||
|
verify(asyncRemote, times(1)).sendText(anyString(), any());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -52,7 +52,7 @@ import java.util.stream.Collectors;
|
|||||||
public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
|
public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TbLwM2MModelConfigStore modelStore;
|
TbLwM2MModelConfigStore modelStore;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Lazy
|
@Lazy
|
||||||
@ -67,14 +67,14 @@ public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private LwM2MTelemetryLogService logService;
|
private LwM2MTelemetryLogService logService;
|
||||||
|
|
||||||
private ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs;
|
ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs;
|
||||||
|
|
||||||
@AfterStartUp(order = AfterStartUp.BEFORE_TRANSPORT_SERVICE)
|
@AfterStartUp(order = AfterStartUp.BEFORE_TRANSPORT_SERVICE)
|
||||||
private void init() {
|
public void init() {
|
||||||
List<LwM2MModelConfig> models = modelStore.getAll();
|
List<LwM2MModelConfig> models = modelStore.getAll();
|
||||||
log.debug("Fetched model configs: {}", models);
|
log.debug("Fetched model configs: {}", models);
|
||||||
currentModelConfigs = models.stream()
|
currentModelConfigs = models.stream()
|
||||||
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m));
|
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m, (existing, replacement) -> existing));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -0,0 +1,73 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.transport.lwm2m.server.model;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
class LwM2MModelConfigServiceImplTest {
|
||||||
|
|
||||||
|
LwM2MModelConfigServiceImpl service;
|
||||||
|
TbLwM2MModelConfigStore modelStore;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
service = new LwM2MModelConfigServiceImpl();
|
||||||
|
modelStore = mock(TbLwM2MModelConfigStore.class);
|
||||||
|
service.modelStore = modelStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testInitWithDuplicatedModels() {
|
||||||
|
LwM2MModelConfig config = new LwM2MModelConfig("urn:imei:951358811362976");
|
||||||
|
List<LwM2MModelConfig> models = List.of(config, config);
|
||||||
|
willReturn(models).given(modelStore).getAll();
|
||||||
|
service.init();
|
||||||
|
assertThat(service.currentModelConfigs).containsExactlyEntriesOf(Map.of(config.getEndpoint(), config));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testInitWithNonUniqueEndpoints() {
|
||||||
|
LwM2MModelConfig configAlfa = new LwM2MModelConfig("urn:imei:951358811362976");
|
||||||
|
LwM2MModelConfig configBravo = new LwM2MModelConfig("urn:imei:151358811362976");
|
||||||
|
LwM2MModelConfig configDelta = new LwM2MModelConfig("urn:imei:151358811362976");
|
||||||
|
assertThat(configBravo.getEndpoint()).as("non-unique endpoints provided").isEqualTo(configDelta.getEndpoint());
|
||||||
|
List<LwM2MModelConfig> models = List.of(configAlfa, configBravo, configDelta);
|
||||||
|
willReturn(models).given(modelStore).getAll();
|
||||||
|
service.init();
|
||||||
|
assertThat(service.currentModelConfigs).containsExactlyInAnyOrderEntriesOf(Map.of(
|
||||||
|
configAlfa.getEndpoint(), configAlfa,
|
||||||
|
configBravo.getEndpoint(), configBravo
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testInitWithEmptyModels() {
|
||||||
|
willReturn(Collections.emptyList()).given(modelStore).getAll();
|
||||||
|
service.init();
|
||||||
|
assertThat(service.currentModelConfigs).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -19,13 +19,13 @@ import com.google.common.base.Function;
|
|||||||
import com.google.common.util.concurrent.AsyncFunction;
|
import com.google.common.util.concurrent.AsyncFunction;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 21.02.17.
|
* Created by ashvayka on 21.02.17.
|
||||||
@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
|
|||||||
|
|
||||||
protected ExecutorService readResultsProcessingExecutor;
|
protected ExecutorService readResultsProcessingExecutor;
|
||||||
|
|
||||||
|
@Value("${cassandra.query.result_processing_threads:50}")
|
||||||
|
private int threadPoolSize;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void startExecutor() {
|
public void startExecutor() {
|
||||||
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback"));
|
readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback");
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
|
|||||||
@ -121,7 +121,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
perTenantLimitReached = true;
|
perTenantLimitReached = true;
|
||||||
}
|
}
|
||||||
} else if (tenantId == null) {
|
} else if (tenantId == null) {
|
||||||
log.info("Invalid task received: {}", task);
|
log.info("[{}] Invalid task received: {}", getBufferName(), task);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!perTenantLimitReached) {
|
if (!perTenantLimitReached) {
|
||||||
@ -157,7 +157,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
public abstract String getBufferName();
|
public abstract String getBufferName();
|
||||||
|
|
||||||
private void dispatch() {
|
private void dispatch() {
|
||||||
log.info("Buffered rate executor thread started");
|
log.info("[{}] Buffered rate executor thread started", getBufferName());
|
||||||
while (!Thread.interrupted()) {
|
while (!Thread.interrupted()) {
|
||||||
int curLvl = concurrencyLevel.get();
|
int curLvl = concurrencyLevel.get();
|
||||||
AsyncTaskContext<T, V> taskCtx = null;
|
AsyncTaskContext<T, V> taskCtx = null;
|
||||||
@ -169,7 +169,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
|
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
|
||||||
printQueriesIdx.set(0);
|
printQueriesIdx.set(0);
|
||||||
String query = queryToString(finalTaskCtx);
|
String query = queryToString(finalTaskCtx);
|
||||||
log.info("[{}] Cassandra query: {}", taskCtx.getId(), query);
|
log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logTask("Processing", finalTaskCtx);
|
logTask("Processing", finalTaskCtx);
|
||||||
@ -222,7 +222,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Buffered rate executor thread stopped");
|
log.info("[{}] Buffered rate executor thread stopped", getBufferName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
|
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
|
||||||
@ -298,7 +298,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
|
statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
|
||||||
|
|
||||||
stats.getStatsCounters().forEach(StatsCounter::clear);
|
stats.getStatsCounters().forEach(StatsCounter::clear);
|
||||||
log.info("Permits {}", statsBuilder);
|
log.info("[{}] Permits {}", getBufferName(), statsBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.getRateLimitedTenants().entrySet().stream()
|
stats.getRateLimitedTenants().entrySet().stream()
|
||||||
@ -314,13 +314,13 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
try {
|
try {
|
||||||
return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName);
|
return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] Failed to get tenant name", tenantId, e);
|
log.error("[{}][{}] Failed to get tenant name", getBufferName(), tenantId, e);
|
||||||
return defaultName;
|
return defaultName;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
|
log.info("[{}][{}][{}] Rate limited requests: {}", getBufferName(), tenantId, name, rateLimitedRequests);
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
|
log.info("[{}][{}] Rate limited requests: {}", getBufferName(), tenantId, rateLimitedRequests);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user