From 3f97bb682a04c25a64a221b07fc6f4a048a99ede Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 19:06:22 +0300 Subject: [PATCH 1/7] executors: names added, shutdownNow for some executors to prevent memory leaks during lifecycle (mostly affects the test runner JVM) --- .../DefaultTbApiUsageStateService.java | 3 +- .../edge/DefaultEdgeNotificationService.java | 3 +- .../transport/lwm2m/client/FwLwM2MDevice.java | 3 +- .../transport/lwm2m/client/SwLwM2MDevice.java | 3 +- .../util/EventDeduplicationExecutorTest.java | 30 ++++++++++++---- .../server/actors/ActorSystemTest.java | 36 +++++++++++++------ .../coapserver/DefaultCoapServerService.java | 3 +- .../snmp/service/SnmpTransportService.java | 13 ++++++- .../msa/connectivity/MqttClientTest.java | 3 +- .../connectivity/MqttGatewayClientTest.java | 3 +- 10 files changed, 76 insertions(+), 24 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 47f698baff..bb768a15d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.server.common.data.ApiFeature; @@ -486,7 +487,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); List> futures = new ArrayList<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index cb24a57fb2..d25e63183c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -79,7 +80,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @PostConstruct public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(); + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications")); } @PreDestroy diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index 2ae1c7d572..a6c2f43c57 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -23,6 +23,7 @@ import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.security.auth.Destroyable; import java.util.Arrays; @@ -37,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java index 8d8f07d542..4e97adf005 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java @@ -24,6 +24,7 @@ import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.security.auth.Destroyable; import java.util.Arrays; @@ -38,7 +39,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java b/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java index ac0b2342ec..bb5541089e 100644 --- a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java +++ b/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java @@ -17,10 +17,12 @@ package org.thingsboard.server.util; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.utils.EventDeduplicationExecutor; import java.util.concurrent.ExecutorService; @@ -31,6 +33,16 @@ import java.util.function.Consumer; @RunWith(MockitoJUnitRunner.class) public class EventDeduplicationExecutorTest { + ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName()); + ExecutorService executor; + + @After + public void tearDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } + @Test public void testSimpleFlowSameThread() throws InterruptedException { simpleFlow(MoreExecutors.newDirectExecutorService()); @@ -48,32 +60,38 @@ public class EventDeduplicationExecutorTest { @Test public void testSimpleFlowSingleThread() throws InterruptedException { - simpleFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + simpleFlow(executor); } @Test public void testPeriodicFlowSingleThread() throws InterruptedException { - periodicFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + periodicFlow(executor); } @Test public void testExceptionFlowSingleThread() throws InterruptedException { - exceptionFlow(Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(threadFactory); + exceptionFlow(executor); } @Test public void testSimpleFlowMultiThread() throws InterruptedException { - simpleFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + simpleFlow(executor); } @Test public void testPeriodicFlowMultiThread() throws InterruptedException { - periodicFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + periodicFlow(executor); } @Test public void testExceptionFlowMultiThread() throws InterruptedException { - exceptionFlow(Executors.newFixedThreadPool(3)); + executor = Executors.newFixedThreadPool(3, threadFactory); + exceptionFlow(executor); } private void simpleFlow(ExecutorService executorService) throws InterruptedException { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index 9c004c6d13..f11f94dfb1 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -22,6 +22,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.DeviceId; import java.util.ArrayList; @@ -45,6 +47,7 @@ public class ActorSystemTest { private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; + private ExecutorService executor; private int parallelism; @Before @@ -60,47 +63,57 @@ public class ActorSystemTest { public void shutdownActorSystem() { actorSystem.stop(); submitPool.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); + } } @Test public void test1actorsAnd100KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(1, _100K, 1); } @Test public void test10actorsAnd100KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(10, _100K, 1); } @Test public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 1, 5); } @Test public void test100KActorsAnd1Messages5times() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 1, 5); } @Test public void test100KActorsAnd10Messages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(_100K, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); testActorsAndMessages(1000, 1000, 10); } @Test public void testNoMessagesAfterDestroy() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); @@ -119,7 +132,8 @@ public class ActorSystemTest { @Test public void testOneActorCreated() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); @@ -145,7 +159,8 @@ public class ActorSystemTest { @Test public void testActorCreatorCalledOnce() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); final int actorsCount = 1000; @@ -169,7 +184,8 @@ public class ActorSystemTest { @Test public void testFailedInit() throws InterruptedException { - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass()); + actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java index 7c869b5039..c803dee67e 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java @@ -24,6 +24,7 @@ import org.eclipse.californium.scandium.DTLSConnector; import org.eclipse.californium.scandium.config.DtlsConnectorConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -118,7 +119,7 @@ public class DefaultCoapServerService implements CoapServerService { CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build(); server.addEndpoint(dtlsCoapEndpoint); tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier(); - dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(); + dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS); } Resource root = server.getRoot(); diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index d0cee4a329..2cce0a5272 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -34,6 +34,7 @@ import org.snmp4j.transport.DefaultTcpTransportMapping; import org.snmp4j.transport.DefaultUdpTransportMapping; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TbTransportService; @@ -90,7 +91,7 @@ public class SnmpTransportService implements TbTransportService { @PostConstruct private void init() throws IOException { queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying")); - responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel); + responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing"); initializeSnmp(); configureResponseDataMappers(); @@ -99,6 +100,16 @@ public class SnmpTransportService implements TbTransportService { log.info("SNMP transport service initialized"); } + @PreDestroy + public void stop() { + if (queryingExecutor != null) { + queryingExecutor.shutdownNow(); + } + if (responseProcessingExecutor != null) { + responseProcessingExecutor.shutdownNow(); + } + } + private void initializeSnmp() throws IOException { TransportMapping transportMapping; switch (snmpUnderlyingProtocol) { diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 7ae8f863af..9df3d9a7f9 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -263,7 +263,7 @@ public class MqttClientTest extends AbstractContainerTest { JsonObject serverRpcPayload = new JsonObject(); serverRpcPayload.addProperty("method", "getValue"); serverRpcPayload.addProperty("params", true); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()))); ListenableFuture future = service.submit(() -> { try { return restClient.getRestTemplate() @@ -287,6 +287,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index def365dcb1..8d419a1a97 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -259,7 +259,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { JsonObject serverRpcPayload = new JsonObject(); serverRpcPayload.addProperty("method", "getValue"); serverRpcPayload.addProperty("params", true); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()))); ListenableFuture future = service.submit(() -> { try { return restClient.getRestTemplate() @@ -273,6 +273,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Wait for RPC call from the server and send the response MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + service.shutdownNow(); Assert.assertNotNull(requestFromServer); Assert.assertNotNull(requestFromServer.getMessage()); From 6a706b32da934eac55d288aacba1e3a231f458b2 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 19:11:37 +0300 Subject: [PATCH 2/7] executors: shutdownNow added for tb-rule-engine-consumer-repartition executor on @PreDestroy --- .../service/queue/DefaultTbRuleEngineConsumerService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 4b1392e14a..953d45d83d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -103,8 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); private final ConcurrentMap topicsConsumerPerPartition = new ConcurrentHashMap<>(); - final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor")); - final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition-executor")); + final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit")); + final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition")); public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, TbRuleEngineSubmitStrategyFactory submitStrategyFactory, @@ -146,6 +146,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< public void stop() { super.destroy(); submitExecutor.shutdownNow(); + repartitionExecutor.shutdownNow(); ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config)); } From 6f66fdb347aab1f26b264176c0b0b866f77d13fc Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 21:20:12 +0300 Subject: [PATCH 3/7] black-box-test: fixed import ThingsBoardThreadFactory --- .../org/thingsboard/server/msa/connectivity/MqttClientTest.java | 1 + .../server/msa/connectivity/MqttGatewayClientTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 9df3d9a7f9..9f3279843c 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -34,6 +34,7 @@ import org.junit.runner.Description; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttHandler; diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 8d419a1a97..4e7b7cd05b 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.http.ResponseEntity; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttHandler; From ac2844b51f7239b9ad2f67d6b4da94756475a4f1 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 21:54:50 +0300 Subject: [PATCH 4/7] tests: call destroy for lwm2m devices to shut down executors inside fwLwM2MDevice and swLwM2MDevice in the test scope --- .../transport/lwm2m/client/FwLwM2MDevice.java | 2 +- .../transport/lwm2m/client/LwM2MTestClient.java | 14 ++++++++++++-- .../transport/lwm2m/client/SwLwM2MDevice.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index a6c2f43c57..cc097c869d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -38,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java index c4f22ff5bb..178b9f473e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/LwM2MTestClient.java @@ -45,6 +45,7 @@ import org.eclipse.leshan.core.request.BootstrapRequest; import org.eclipse.leshan.core.request.DeregisterRequest; import org.eclipse.leshan.core.request.RegisterRequest; import org.eclipse.leshan.core.request.UpdateRequest; +import org.junit.Assert; import java.io.IOException; import java.net.InetSocketAddress; @@ -65,8 +66,11 @@ public class LwM2MTestClient { private final ScheduledExecutorService executor; private final String endpoint; private LeshanClient client; + private FwLwM2MDevice fwLwM2MDevice; + private SwLwM2MDevice swLwM2MDevice; public void init(Security security, NetworkConfig coapConfig) throws InvalidDDFFileException, IOException { + Assert.assertNull("client already initialized", client); String[] resources = new String[]{"0.xml", "1.xml", "2.xml", "3.xml", "5.xml", "9.xml"}; List models = new ArrayList<>(); for (String resourceName : resources) { @@ -77,8 +81,8 @@ public class LwM2MTestClient { initializer.setInstancesForObject(SECURITY, security); initializer.setInstancesForObject(SERVER, new Server(123, 300)); initializer.setInstancesForObject(DEVICE, new SimpleLwM2MDevice()); - initializer.setInstancesForObject(FIRMWARE, new FwLwM2MDevice()); - initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, new SwLwM2MDevice()); + initializer.setInstancesForObject(FIRMWARE, fwLwM2MDevice = new FwLwM2MDevice()); + initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, swLwM2MDevice = new SwLwM2MDevice()); initializer.setClassForObject(LwM2mId.ACCESS_CONTROL, DummyInstanceEnabler.class); DtlsConnectorConfig.Builder dtlsConfig = new DtlsConnectorConfig.Builder(); @@ -229,6 +233,12 @@ public class LwM2MTestClient { public void destroy() { client.destroy(true); + if (fwLwM2MDevice != null) { + fwLwM2MDevice.destroy(); + } + if (swLwM2MDevice != null) { + swLwM2MDevice.destroy(); + } } } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java index 4e97adf005..b06a3ebbd3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java @@ -39,7 +39,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); private final AtomicInteger state = new AtomicInteger(0); From 9ee0d87687eb539e366a49eb58505236c7295ea7 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 22:47:05 +0300 Subject: [PATCH 5/7] tests: Coap client destroy on processAfterTest at AbstractCoapIntegrationTest --- .../transport/coap/AbstractCoapIntegrationTest.java | 10 ++++++++++ .../AbstractCoapAttributesRequestIntegrationTest.java | 4 ++-- ...tractCoapAttributesRequestProtoIntegrationTest.java | 3 +-- .../AbstractCoapAttributesUpdatesIntegrationTest.java | 2 +- .../coap/claim/AbstractCoapClaimDeviceTest.java | 2 +- .../coap/claim/AbstractCoapClaimProtoDeviceTest.java | 2 +- .../provision/AbstractCoapProvisionJsonDeviceTest.java | 2 +- .../AbstractCoapProvisionProtoDeviceTest.java | 2 +- .../rpc/AbstractCoapServerSideRpcIntegrationTest.java | 4 ++-- .../AbstractCoapAttributesIntegrationTest.java | 2 +- .../AbstractCoapTimeseriesIntegrationTest.java | 4 ++-- 11 files changed, 23 insertions(+), 14 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/AbstractCoapIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/AbstractCoapIntegrationTest.java index 83023e21b9..8271fc6f95 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/AbstractCoapIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/AbstractCoapIntegrationTest.java @@ -52,6 +52,16 @@ import static org.junit.Assert.assertNotNull; @Slf4j public abstract class AbstractCoapIntegrationTest extends AbstractTransportIntegrationTest { + protected CoapClient client; + + @Override + protected void processAfterTest() throws Exception { + if (client != null) { + client.shutdown(); + } + super.processAfterTest(); + } + protected void processBeforeTest(String deviceName, CoapDeviceType coapDeviceType, TransportPayloadType payloadType) throws Exception { this.processBeforeTest(deviceName, coapDeviceType, payloadType, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestIntegrationTest.java index 999344b5f3..bced75775e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestIntegrationTest.java @@ -75,7 +75,7 @@ public abstract class AbstractCoapAttributesRequestIntegrationTest extends Abstr String keys = "attribute1,attribute2,attribute3,attribute4,attribute5"; String featureTokenUrl = getFeatureTokenUrl(accessToken, FeatureType.ATTRIBUTES) + "?clientKeys=" + keys + "&sharedKeys=" + keys; - CoapClient client = getCoapClient(featureTokenUrl); + client = getCoapClient(featureTokenUrl); CoapResponse getAttributesResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).get(); validateResponse(getAttributesResponse); @@ -83,7 +83,7 @@ public abstract class AbstractCoapAttributesRequestIntegrationTest extends Abstr protected void postAttributes() throws Exception { doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - CoapClient client = getCoapClient(FeatureType.ATTRIBUTES); + client = getCoapClient(FeatureType.ATTRIBUTES); CoapResponse coapResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).post(POST_ATTRIBUTES_PAYLOAD.getBytes(), MediaTypeRegistry.APPLICATION_JSON); assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode()); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestProtoIntegrationTest.java index 5bef44434e..b5534cba80 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/request/AbstractCoapAttributesRequestProtoIntegrationTest.java @@ -21,7 +21,6 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.squareup.wire.schema.internal.parser.ProtoFileElement; import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.core.CoapClient; import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; @@ -124,7 +123,7 @@ public abstract class AbstractCoapAttributesRequestProtoIntegrationTest extends .setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject) .build(); byte[] payload = postAttributesMsg.toByteArray(); - CoapClient client = getCoapClient(FeatureType.ATTRIBUTES); + client = getCoapClient(FeatureType.ATTRIBUTES); CoapResponse coapResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).post(payload, MediaTypeRegistry.APPLICATION_JSON); assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode()); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java index b1a4dc3c10..025819e104 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java @@ -71,7 +71,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr if (!emptyCurrentStateNotification) { doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION, String.class, status().isOk()); } - CoapClient client = getCoapClient(FeatureType.ATTRIBUTES); + client = getCoapClient(FeatureType.ATTRIBUTES); CountDownLatch latch = new CountDownLatch(1); TestCoapCallback callback = new TestCoapCallback(latch); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimDeviceTest.java index 5cdcd2559e..30c30070b9 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimDeviceTest.java @@ -90,7 +90,7 @@ public abstract class AbstractCoapClaimDeviceTest extends AbstractCoapIntegratio protected void processTestClaimingDevice(boolean emptyPayload) throws Exception { log.warn("[testClaimingDevice] Device: {}, Transport type: {}", savedDevice.getName(), savedDevice.getType()); - CoapClient client = getCoapClient(FeatureType.CLAIM); + client = getCoapClient(FeatureType.CLAIM); byte[] payloadBytes; byte[] failurePayloadBytes; if (emptyPayload) { diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimProtoDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimProtoDeviceTest.java index a9691e6693..cd64d85bf3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimProtoDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/claim/AbstractCoapClaimProtoDeviceTest.java @@ -49,7 +49,7 @@ public abstract class AbstractCoapClaimProtoDeviceTest extends AbstractCoapClaim @Override protected void processTestClaimingDevice(boolean emptyPayload) throws Exception { - CoapClient client = getCoapClient(FeatureType.CLAIM); + client = getCoapClient(FeatureType.CLAIM); byte[] payloadBytes; if (emptyPayload) { TransportApiProtos.ClaimDevice claimDevice = getClaimDevice(0, emptyPayload); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionJsonDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionJsonDeviceTest.java index caecaea8ec..eccba64ddc 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionJsonDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionJsonDeviceTest.java @@ -180,7 +180,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn private CoapResponse createCoapClientAndPublish(String deviceCredentials) throws Exception { String provisionRequestMsg = createTestProvisionMessage(deviceCredentials); - CoapClient client = getCoapClient(FeatureType.PROVISION); + client = getCoapClient(FeatureType.PROVISION); return postProvision(client, provisionRequestMsg.getBytes()); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java index 682d9c7e7b..7db300e721 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java @@ -176,7 +176,7 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI } private CoapResponse createCoapClientAndPublish(byte[] provisionRequestMsg) throws Exception { - CoapClient client = getCoapClient(FeatureType.PROVISION); + client = getCoapClient(FeatureType.PROVISION); return postProvision(client, provisionRequestMsg); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java index 758dc02004..3fbc8cd214 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java @@ -54,7 +54,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC } protected void processOneWayRpcTest() throws Exception { - CoapClient client = getCoapClient(FeatureType.RPC); + client = getCoapClient(FeatureType.RPC); client.useCONs(); CountDownLatch latch = new CountDownLatch(1); @@ -82,7 +82,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC } protected void processTwoWayRpcTest(String expectedResponseResult) throws Exception { - CoapClient client = getCoapClient(FeatureType.RPC); + client = getCoapClient(FeatureType.RPC); client.useCONs(); CountDownLatch latch = new CountDownLatch(1); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/attributes/AbstractCoapAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/attributes/AbstractCoapAttributesIntegrationTest.java index aac8933af0..b71d448e7e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/attributes/AbstractCoapAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/attributes/AbstractCoapAttributesIntegrationTest.java @@ -69,7 +69,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap } protected void processAttributesTest(List expectedKeys, byte[] payload, boolean presenceFieldsTest) throws Exception { - CoapClient client = getCoapClient(FeatureType.ATTRIBUTES); + client = getCoapClient(FeatureType.ATTRIBUTES); postAttributes(client, payload); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/timeseries/AbstractCoapTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/timeseries/AbstractCoapTimeseriesIntegrationTest.java index 5438a7f02b..6993d73320 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/timeseries/AbstractCoapTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/telemetry/timeseries/AbstractCoapTimeseriesIntegrationTest.java @@ -71,8 +71,8 @@ public abstract class AbstractCoapTimeseriesIntegrationTest extends AbstractCoap } protected void processTestPostTelemetry(byte[] payloadBytes, List expectedKeys, boolean withTs, boolean presenceFieldsTest) throws Exception { - CoapClient coapClient = getCoapClient(FeatureType.TELEMETRY); - postTelemetry(coapClient, payloadBytes); + client = getCoapClient(FeatureType.TELEMETRY); + postTelemetry(client, payloadBytes); String deviceId = savedDevice.getId().getId().toString(); From 8601121e7ee4393532dd3900eaec85131520baee Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Aug 2021 23:05:54 +0300 Subject: [PATCH 6/7] executors: named fixed thread pools; tests - added executor shutdown on tearDown --- .../queue/TbMsgPackProcessingContextTest.java | 3 ++- .../server/actors/ActorSystemTest.java | 2 +- .../AbstractParallelTbQueueConsumerTemplate.java | 3 ++- .../server/transport/coap/client/NoSecClient.java | 3 ++- .../transport/coap/client/NoSecObserveClient.java | 3 ++- .../transport/coap/client/SecureClientNoAuth.java | 3 ++- .../transport/coap/client/SecureClientX509.java | 3 ++- .../dao/service/BaseDeviceProfileServiceTest.java | 3 ++- .../server/dao/sql/device/JpaDeviceDaoTest.java | 15 +++++++++++++-- 9 files changed, 28 insertions(+), 10 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java index 1659431540..4438dfc3f2 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java @@ -21,6 +21,7 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; @@ -59,7 +60,7 @@ public class TbMsgPackProcessingContextTest { //log.warn("preparing the test..."); int msgCount = 1000; int parallelCount = 5; - executorService = Executors.newFixedThreadPool(parallelCount); + executorService = Executors.newFixedThreadPool(parallelCount, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); ConcurrentMap> messages = new ConcurrentHashMap<>(msgCount); for (int i = 0; i < msgCount; i++) { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index f11f94dfb1..d2792dc1db 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -56,7 +56,7 @@ public class ActorSystemTest { parallelism = Math.max(2, cores / 2); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); - submitPool = Executors.newFixedThreadPool(parallelism); //order guaranteed + submitPool = Executors.newFixedThreadPool(parallelism, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-submit-test-scope")); //order guaranteed } @After diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java index c1a8a82af4..deba482a28 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.common; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.queue.TbQueueMsg; import java.util.concurrent.Executors; @@ -41,7 +42,7 @@ public abstract class AbstractParallelTbQueueConsumerTemplate> futures = new ArrayList<>(); for (int i = 0; i < 50; i++) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/device/JpaDeviceDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/device/JpaDeviceDaoTest.java index ca2a0db493..9d9d128966 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sql/device/JpaDeviceDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/device/JpaDeviceDaoTest.java @@ -19,8 +19,10 @@ import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.junit.After; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -47,6 +49,15 @@ public class JpaDeviceDaoTest extends AbstractJpaDaoTest { @Autowired private DeviceDao deviceDao; + ListeningExecutorService executor; + + @After + public void tearDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } + @Test public void testFindDevicesByTenantId() { UUID tenantId1 = Uuids.timeBased(); @@ -77,8 +88,8 @@ public class JpaDeviceDaoTest extends AbstractJpaDaoTest { assertNotNull(entity); assertEquals(uuid, entity.getId().getId()); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); - ListenableFuture future = service.submit(() -> deviceDao.findById(new TenantId(tenantId), uuid)); + executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"))); + ListenableFuture future = executor.submit(() -> deviceDao.findById(new TenantId(tenantId), uuid)); Device asyncDevice = future.get(); assertNotNull("Async device expected to be not null", asyncDevice); } From d6852dcb7c570ae4691b828fbf77d8d566581160 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 25 Aug 2021 00:13:30 +0300 Subject: [PATCH 7/7] test: assert not null COAP response instead NullPointerException (to clarify the test failure cause) --- .../coap/provision/AbstractCoapProvisionProtoDeviceTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java index 7db300e721..14d72297a4 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/provision/AbstractCoapProvisionProtoDeviceTest.java @@ -172,7 +172,9 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI private CoapResponse createCoapClientAndPublish() throws Exception { byte[] provisionRequestMsg = createTestProvisionMessage(); - return createCoapClientAndPublish(provisionRequestMsg); + CoapResponse coapResponse = createCoapClientAndPublish(provisionRequestMsg); + Assert.assertNotNull("COAP response", coapResponse); + return coapResponse; } private CoapResponse createCoapClientAndPublish(byte[] provisionRequestMsg) throws Exception {