From 3f97bb682a04c25a64a221b07fc6f4a048a99ede Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 19:06:22 +0300 Subject: [PATCH] executors: names added, shutdownNow for some executors to prevent memory leaks during lifecycle (mostly affects the test runner JVM) --- .../DefaultTbApiUsageStateService.java | 3 +- .../edge/DefaultEdgeNotificationService.java | 3 +- .../transport/lwm2m/client/FwLwM2MDevice.java | 3 +- .../transport/lwm2m/client/SwLwM2MDevice.java | 3 +- .../util/EventDeduplicationExecutorTest.java | 30 ++++++++++++---- .../server/actors/ActorSystemTest.java | 36 +++++++++++++------ .../coapserver/DefaultCoapServerService.java | 3 +- .../snmp/service/SnmpTransportService.java | 13 ++++++- .../msa/connectivity/MqttClientTest.java | 3 +- .../connectivity/MqttGatewayClientTest.java | 3 +- 10 files changed, 76 insertions(+), 24 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 47f698baff..bb768a15d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.server.common.data.ApiFeature; @@ -486,7 +487,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); List> futures = new ArrayList<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index cb24a57fb2..d25e63183c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -79,7 +80,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @PostConstruct public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(); + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications")); } @PreDestroy diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index 2ae1c7d572..a6c2f43c57 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -23,6 +23,7 @@ import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.security.auth.Destroyable; import java.util.Arrays; @@ -37,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java index 8d8f07d542..4e97adf005 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java @@ -24,6 +24,7 @@ import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.security.auth.Destroyable; import java.util.Arrays; @@ -38,7 +39,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java b/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java index ac0b2342ec..bb5541089e 100644 --- a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java +++ b/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java @@ -17,10 +17,12 @@ package org.thingsboard.server.util; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.utils.EventDeduplicationExecutor; import java.util.concurrent.ExecutorService; @@ -31,6 +33,16 @@ import java.util.function.Consumer; @RunWith(MockitoJUnitRunner.class) public class EventDeduplicationExecutorTest { + ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName()); + ExecutorService executor; + + @After + public void tearDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } + @Test public void testSimpleFlowSameThread() throws InterruptedException { simpleFlow(MoreExecutors.newDirectExecutorService()); @@ -48,32 +60,38 @@ public class EventDeduplicationExecutorTest { @Test public void testSimpleFlowSingleThread() throws InterruptedException { - simpleFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + simpleFlow(executor); } @Test public void testPeriodicFlowSingleThread() throws InterruptedException { - periodicFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + periodicFlow(executor); } @Test public void testExceptionFlowSingleThread() throws InterruptedException { - exceptionFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + exceptionFlow(executor); } @Test public void testSimpleFlowMultiThread() throws InterruptedException { - simpleFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + simpleFlow(executor); } @Test public void testPeriodicFlowMultiThread() throws InterruptedException { - periodicFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + periodicFlow(executor); } @Test public void testExceptionFlowMultiThread() throws InterruptedException { - exceptionFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + exceptionFlow(executor); } private void simpleFlow(ExecutorService executorService) throws InterruptedException { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index 9c004c6d13..f11f94dfb1 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -22,6 +22,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.DeviceId; import java.util.ArrayList; @@ -45,6 +47,7 @@ public class ActorSystemTest { private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; + private ExecutorService executor; private int parallelism; @Before @@ -60,47 +63,57 @@ public class ActorSystemTest { public void shutdownActorSystem() { actorSystem.stop(); submitPool.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); + } } @Test public void test1actorsAnd100KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(1, _100K, 1); } @Test public void test10actorsAnd100KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(10, _100K, 1); } @Test public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 1, 5); } @Test public void test100KActorsAnd1Messages5times() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 1, 5); } @Test public void test100KActorsAnd10Messages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(1000, 1000, 10); } @Test public void testNoMessagesAfterDestroy() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); @@ -119,7 +132,8 @@ public class ActorSystemTest { @Test public void testOneActorCreated() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); @@ -145,7 +159,8 @@ public class ActorSystemTest { @Test public void testActorCreatorCalledOnce() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); final int actorsCount = 1000; @@ -169,7 +184,8 @@ public class ActorSystemTest { @Test public void testFailedInit() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java index 7c869b5039..c803dee67e 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java @@ -24,6 +24,7 @@ import org.eclipse.californium.scandium.DTLSConnector; import org.eclipse.californium.scandium.config.DtlsConnectorConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -118,7 +119,7 @@ public class DefaultCoapServerService implements CoapServerService { CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build(); server.addEndpoint(dtlsCoapEndpoint); tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier(); - dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(); + dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS); } Resource root = server.getRoot(); diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index d0cee4a329..2cce0a5272 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -34,6 +34,7 @@ import org.snmp4j.transport.DefaultTcpTransportMapping; import org.snmp4j.transport.DefaultUdpTransportMapping; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TbTransportService; @@ -90,7 +91,7 @@ public class SnmpTransportService implements TbTransportService { @PostConstruct private void init() throws IOException { queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying")); - responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel); + responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing"); initializeSnmp(); configureResponseDataMappers(); @@ -99,6 +100,16 @@ public class SnmpTransportService implements TbTransportService { log.info("SNMP transport service initialized"); } + @PreDestroy + public void stop() { + if (queryingExecutor != null) { + queryingExecutor.shutdownNow(); + } + if (responseProcessingExecutor != null) { + responseProcessingExecutor.shutdownNow(); + } + } + private void initializeSnmp() throws IOException { TransportMapping transportMapping; switch (snmpUnderlyingProtocol) { diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 7ae8f863af..9df3d9a7f9 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -263,7 +263,7 @@ public class MqttClientTest extends AbstractContainerTest { JsonObject serverRpcPayload = new JsonObject(); serverRpcPayload.addProperty("method", "getValue"); serverRpcPayload.addProperty("params", true); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()))); ListenableFuture future = service.submit(() -> { try { return restClient.getRestTemplate() @@ -287,6 +287,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index def365dcb1..8d419a1a97 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -259,7 +259,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { JsonObject serverRpcPayload = new JsonObject(); serverRpcPayload.addProperty("method", "getValue"); serverRpcPayload.addProperty("params", true); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()))); ListenableFuture future = service.submit(() -> { try { return restClient.getRestTemplate() @@ -273,6 +273,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Wait for RPC call from the server and send the response MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + service.shutdownNow(); Assert.assertNotNull(requestFromServer); Assert.assertNotNull(requestFromServer.getMessage());