executors: names added, shutdownNow for some executors to prevent memory leaks during lifecycle (mostly affects the test runner JVM)

This commit is contained in:
Sergey Matvienko 2021-08-24 19:06:22 +03:00
parent 193c0ce64e
commit 3f97bb682a
10 changed files with 76 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.server.common.data.ApiFeature;
@ -486,7 +487,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
log.info("Initializing tenant states.");
updateLock.lock();
try {
ExecutorService tmpInitExecutor = Executors.newWorkStealingPool(20);
ExecutorService tmpInitExecutor = ThingsBoardExecutors.newWorkStealingPool(20, "init-tenant-states-from-db");
try {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
List<Future<?>> futures = new ArrayList<>();

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -79,7 +80,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
@PostConstruct
public void initExecutor() {
tsCallBackExecutor = Executors.newSingleThreadExecutor();
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications"));
}
@PreDestroy

View File

@ -23,6 +23,7 @@ import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.WriteResponse;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.security.auth.Destroyable;
import java.util.Arrays;
@ -37,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
private final AtomicInteger state = new AtomicInteger(0);

View File

@ -24,6 +24,7 @@ import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.WriteResponse;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.security.auth.Destroyable;
import java.util.Arrays;
@ -38,7 +39,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
private final AtomicInteger state = new AtomicInteger(0);

View File

@ -17,10 +17,12 @@ package org.thingsboard.server.util;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.utils.EventDeduplicationExecutor;
import java.util.concurrent.ExecutorService;
@ -31,6 +33,16 @@ import java.util.function.Consumer;
@RunWith(MockitoJUnitRunner.class)
public class EventDeduplicationExecutorTest {
ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName());
ExecutorService executor;
@After
public void tearDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
}
@Test
public void testSimpleFlowSameThread() throws InterruptedException {
simpleFlow(MoreExecutors.newDirectExecutorService());
@ -48,32 +60,38 @@ public class EventDeduplicationExecutorTest {
@Test
public void testSimpleFlowSingleThread() throws InterruptedException {
simpleFlow(Executors.newSingleThreadExecutor());
executor = Executors.newSingleThreadExecutor(threadFactory);
simpleFlow(executor);
}
@Test
public void testPeriodicFlowSingleThread() throws InterruptedException {
periodicFlow(Executors.newSingleThreadExecutor());
executor = Executors.newSingleThreadExecutor(threadFactory);
periodicFlow(executor);
}
@Test
public void testExceptionFlowSingleThread() throws InterruptedException {
exceptionFlow(Executors.newSingleThreadExecutor());
executor = Executors.newSingleThreadExecutor(threadFactory);
exceptionFlow(executor);
}
@Test
public void testSimpleFlowMultiThread() throws InterruptedException {
simpleFlow(Executors.newFixedThreadPool(3));
executor = Executors.newFixedThreadPool(3, threadFactory);
simpleFlow(executor);
}
@Test
public void testPeriodicFlowMultiThread() throws InterruptedException {
periodicFlow(Executors.newFixedThreadPool(3));
executor = Executors.newFixedThreadPool(3, threadFactory);
periodicFlow(executor);
}
@Test
public void testExceptionFlowMultiThread() throws InterruptedException {
exceptionFlow(Executors.newFixedThreadPool(3));
executor = Executors.newFixedThreadPool(3, threadFactory);
exceptionFlow(executor);
}
private void simpleFlow(ExecutorService executorService) throws InterruptedException {

View File

@ -22,6 +22,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.DeviceId;
import java.util.ArrayList;
@ -45,6 +47,7 @@ public class ActorSystemTest {
private volatile TbActorSystem actorSystem;
private volatile ExecutorService submitPool;
private ExecutorService executor;
private int parallelism;
@Before
@ -60,47 +63,57 @@ public class ActorSystemTest {
public void shutdownActorSystem() {
actorSystem.stop();
submitPool.shutdownNow();
if (executor != null) {
executor.shutdownNow();
}
}
@Test
public void test1actorsAnd100KMessages() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(1, _100K, 1);
}
@Test
public void test10actorsAnd100KMessages() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(10, _100K, 1);
}
@Test
public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor());
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(_100K, 1, 5);
}
@Test
public void test100KActorsAnd1Messages5times() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(_100K, 1, 5);
}
@Test
public void test100KActorsAnd10Messages() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(_100K, 10, 1);
}
@Test
public void test1KActorsAnd1KMessages() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
testActorsAndMessages(1000, 1000, 10);
}
@Test
public void testNoMessagesAfterDestroy() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
ActorTestCtx testCtx1 = getActorTestCtx(1);
ActorTestCtx testCtx2 = getActorTestCtx(1);
@ -119,7 +132,8 @@ public class ActorSystemTest {
@Test
public void testOneActorCreated() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
ActorTestCtx testCtx1 = getActorTestCtx(1);
ActorTestCtx testCtx2 = getActorTestCtx(1);
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
@ -145,7 +159,8 @@ public class ActorSystemTest {
@Test
public void testActorCreatorCalledOnce() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
ActorTestCtx testCtx = getActorTestCtx(1);
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
final int actorsCount = 1000;
@ -169,7 +184,8 @@ public class ActorSystemTest {
@Test
public void testFailedInit() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
ActorTestCtx testCtx1 = getActorTestCtx(1);
ActorTestCtx testCtx2 = getActorTestCtx(1);

View File

@ -24,6 +24,7 @@ import org.eclipse.californium.scandium.DTLSConnector;
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -118,7 +119,7 @@ public class DefaultCoapServerService implements CoapServerService {
CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build();
server.addEndpoint(dtlsCoapEndpoint);
tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier();
dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor();
dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS);
}
Resource root = server.getRoot();

View File

@ -34,6 +34,7 @@ import org.snmp4j.transport.DefaultTcpTransportMapping;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.TbTransportService;
@ -90,7 +91,7 @@ public class SnmpTransportService implements TbTransportService {
@PostConstruct
private void init() throws IOException {
queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying"));
responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel);
responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing");
initializeSnmp();
configureResponseDataMappers();
@ -99,6 +100,16 @@ public class SnmpTransportService implements TbTransportService {
log.info("SNMP transport service initialized");
}
@PreDestroy
public void stop() {
if (queryingExecutor != null) {
queryingExecutor.shutdownNow();
}
if (responseProcessingExecutor != null) {
responseProcessingExecutor.shutdownNow();
}
}
private void initializeSnmp() throws IOException {
TransportMapping<?> transportMapping;
switch (snmpUnderlyingProtocol) {

View File

@ -263,7 +263,7 @@ public class MqttClientTest extends AbstractContainerTest {
JsonObject serverRpcPayload = new JsonObject();
serverRpcPayload.addProperty("method", "getValue");
serverRpcPayload.addProperty("params", true);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())));
ListenableFuture<ResponseEntity> future = service.submit(() -> {
try {
return restClient.getRestTemplate()
@ -287,6 +287,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
service.shutdownNow();
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());

View File

@ -259,7 +259,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
JsonObject serverRpcPayload = new JsonObject();
serverRpcPayload.addProperty("method", "getValue");
serverRpcPayload.addProperty("params", true);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())));
ListenableFuture<ResponseEntity> future = service.submit(() -> {
try {
return restClient.getRestTemplate()
@ -273,6 +273,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
// Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
service.shutdownNow();
Assert.assertNotNull(requestFromServer);
Assert.assertNotNull(requestFromServer.getMessage());