Merge pull request #5178 from smatvienko-tb/thread-pool-naming-aug-2021
Thread pool naming (aug 2021)
This commit is contained in:
commit
67014b45a0
@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.MailService;
|
||||
import org.thingsboard.server.common.data.ApiFeature;
|
||||
@ -486,7 +487,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
||||
log.info("Initializing tenant states.");
|
||||
updateLock.lock();
|
||||
try {
|
||||
ExecutorService tmpInitExecutor = Executors.newWorkStealingPool(20);
|
||||
ExecutorService tmpInitExecutor = ThingsBoardExecutors.newWorkStealingPool(20, "init-tenant-states-from-db");
|
||||
try {
|
||||
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
|
||||
List<Future<?>> futures = new ArrayList<>();
|
||||
|
||||
@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -79,7 +80,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor();
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -103,8 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, TbTopicWithConsumerPerPartition> topicsConsumerPerPartition = new ConcurrentHashMap<>();
|
||||
final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor"));
|
||||
final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition-executor"));
|
||||
final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit"));
|
||||
final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition"));
|
||||
|
||||
public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
|
||||
TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
|
||||
@ -146,6 +146,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
public void stop() {
|
||||
super.destroy();
|
||||
submitExecutor.shutdownNow();
|
||||
repartitionExecutor.shutdownNow();
|
||||
ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config));
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
|
||||
@ -59,7 +60,7 @@ public class TbMsgPackProcessingContextTest {
|
||||
//log.warn("preparing the test...");
|
||||
int msgCount = 1000;
|
||||
int parallelCount = 5;
|
||||
executorService = Executors.newFixedThreadPool(parallelCount);
|
||||
executorService = Executors.newFixedThreadPool(parallelCount, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
|
||||
|
||||
ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> messages = new ConcurrentHashMap<>(msgCount);
|
||||
for (int i = 0; i < msgCount; i++) {
|
||||
|
||||
@ -52,6 +52,16 @@ import static org.junit.Assert.assertNotNull;
|
||||
@Slf4j
|
||||
public abstract class AbstractCoapIntegrationTest extends AbstractTransportIntegrationTest {
|
||||
|
||||
protected CoapClient client;
|
||||
|
||||
@Override
|
||||
protected void processAfterTest() throws Exception {
|
||||
if (client != null) {
|
||||
client.shutdown();
|
||||
}
|
||||
super.processAfterTest();
|
||||
}
|
||||
|
||||
protected void processBeforeTest(String deviceName, CoapDeviceType coapDeviceType, TransportPayloadType payloadType) throws Exception {
|
||||
this.processBeforeTest(deviceName, coapDeviceType, payloadType, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ public abstract class AbstractCoapAttributesRequestIntegrationTest extends Abstr
|
||||
|
||||
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
|
||||
String featureTokenUrl = getFeatureTokenUrl(accessToken, FeatureType.ATTRIBUTES) + "?clientKeys=" + keys + "&sharedKeys=" + keys;
|
||||
CoapClient client = getCoapClient(featureTokenUrl);
|
||||
client = getCoapClient(featureTokenUrl);
|
||||
|
||||
CoapResponse getAttributesResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).get();
|
||||
validateResponse(getAttributesResponse);
|
||||
@ -83,7 +83,7 @@ public abstract class AbstractCoapAttributesRequestIntegrationTest extends Abstr
|
||||
|
||||
protected void postAttributes() throws Exception {
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
CoapClient client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
CoapResponse coapResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).post(POST_ATTRIBUTES_PAYLOAD.getBytes(), MediaTypeRegistry.APPLICATION_JSON);
|
||||
assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode());
|
||||
}
|
||||
|
||||
@ -21,7 +21,6 @@ import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapClient;
|
||||
import org.eclipse.californium.core.CoapResponse;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
@ -124,7 +123,7 @@ public abstract class AbstractCoapAttributesRequestProtoIntegrationTest extends
|
||||
.setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject)
|
||||
.build();
|
||||
byte[] payload = postAttributesMsg.toByteArray();
|
||||
CoapClient client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
CoapResponse coapResponse = client.setTimeout(CLIENT_REQUEST_TIMEOUT).post(payload, MediaTypeRegistry.APPLICATION_JSON);
|
||||
assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode());
|
||||
}
|
||||
|
||||
@ -71,7 +71,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr
|
||||
if (!emptyCurrentStateNotification) {
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION, String.class, status().isOk());
|
||||
}
|
||||
CoapClient client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
TestCoapCallback callback = new TestCoapCallback(latch);
|
||||
|
||||
@ -90,7 +90,7 @@ public abstract class AbstractCoapClaimDeviceTest extends AbstractCoapIntegratio
|
||||
|
||||
protected void processTestClaimingDevice(boolean emptyPayload) throws Exception {
|
||||
log.warn("[testClaimingDevice] Device: {}, Transport type: {}", savedDevice.getName(), savedDevice.getType());
|
||||
CoapClient client = getCoapClient(FeatureType.CLAIM);
|
||||
client = getCoapClient(FeatureType.CLAIM);
|
||||
byte[] payloadBytes;
|
||||
byte[] failurePayloadBytes;
|
||||
if (emptyPayload) {
|
||||
|
||||
@ -49,7 +49,7 @@ public abstract class AbstractCoapClaimProtoDeviceTest extends AbstractCoapClaim
|
||||
|
||||
@Override
|
||||
protected void processTestClaimingDevice(boolean emptyPayload) throws Exception {
|
||||
CoapClient client = getCoapClient(FeatureType.CLAIM);
|
||||
client = getCoapClient(FeatureType.CLAIM);
|
||||
byte[] payloadBytes;
|
||||
if (emptyPayload) {
|
||||
TransportApiProtos.ClaimDevice claimDevice = getClaimDevice(0, emptyPayload);
|
||||
|
||||
@ -180,7 +180,7 @@ public abstract class AbstractCoapProvisionJsonDeviceTest extends AbstractCoapIn
|
||||
|
||||
private CoapResponse createCoapClientAndPublish(String deviceCredentials) throws Exception {
|
||||
String provisionRequestMsg = createTestProvisionMessage(deviceCredentials);
|
||||
CoapClient client = getCoapClient(FeatureType.PROVISION);
|
||||
client = getCoapClient(FeatureType.PROVISION);
|
||||
return postProvision(client, provisionRequestMsg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
@ -172,11 +172,13 @@ public abstract class AbstractCoapProvisionProtoDeviceTest extends AbstractCoapI
|
||||
|
||||
private CoapResponse createCoapClientAndPublish() throws Exception {
|
||||
byte[] provisionRequestMsg = createTestProvisionMessage();
|
||||
return createCoapClientAndPublish(provisionRequestMsg);
|
||||
CoapResponse coapResponse = createCoapClientAndPublish(provisionRequestMsg);
|
||||
Assert.assertNotNull("COAP response", coapResponse);
|
||||
return coapResponse;
|
||||
}
|
||||
|
||||
private CoapResponse createCoapClientAndPublish(byte[] provisionRequestMsg) throws Exception {
|
||||
CoapClient client = getCoapClient(FeatureType.PROVISION);
|
||||
client = getCoapClient(FeatureType.PROVISION);
|
||||
return postProvision(client, provisionRequestMsg);
|
||||
}
|
||||
|
||||
|
||||
@ -54,7 +54,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
|
||||
}
|
||||
|
||||
protected void processOneWayRpcTest() throws Exception {
|
||||
CoapClient client = getCoapClient(FeatureType.RPC);
|
||||
client = getCoapClient(FeatureType.RPC);
|
||||
client.useCONs();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
@ -82,7 +82,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
|
||||
}
|
||||
|
||||
protected void processTwoWayRpcTest(String expectedResponseResult) throws Exception {
|
||||
CoapClient client = getCoapClient(FeatureType.RPC);
|
||||
client = getCoapClient(FeatureType.RPC);
|
||||
client.useCONs();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@ -69,7 +69,7 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
|
||||
}
|
||||
|
||||
protected void processAttributesTest(List<String> expectedKeys, byte[] payload, boolean presenceFieldsTest) throws Exception {
|
||||
CoapClient client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
client = getCoapClient(FeatureType.ATTRIBUTES);
|
||||
|
||||
postAttributes(client, payload);
|
||||
|
||||
|
||||
@ -71,8 +71,8 @@ public abstract class AbstractCoapTimeseriesIntegrationTest extends AbstractCoap
|
||||
}
|
||||
|
||||
protected void processTestPostTelemetry(byte[] payloadBytes, List<String> expectedKeys, boolean withTs, boolean presenceFieldsTest) throws Exception {
|
||||
CoapClient coapClient = getCoapClient(FeatureType.TELEMETRY);
|
||||
postTelemetry(coapClient, payloadBytes);
|
||||
client = getCoapClient(FeatureType.TELEMETRY);
|
||||
postTelemetry(client, payloadBytes);
|
||||
|
||||
String deviceId = savedDevice.getId().getId().toString();
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ import org.eclipse.leshan.core.node.LwM2mResource;
|
||||
import org.eclipse.leshan.core.response.ExecuteResponse;
|
||||
import org.eclipse.leshan.core.response.ReadResponse;
|
||||
import org.eclipse.leshan.core.response.WriteResponse;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.security.auth.Destroyable;
|
||||
import java.util.Arrays;
|
||||
@ -37,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
|
||||
|
||||
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9);
|
||||
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
|
||||
|
||||
private final AtomicInteger state = new AtomicInteger(0);
|
||||
|
||||
|
||||
@ -45,6 +45,7 @@ import org.eclipse.leshan.core.request.BootstrapRequest;
|
||||
import org.eclipse.leshan.core.request.DeregisterRequest;
|
||||
import org.eclipse.leshan.core.request.RegisterRequest;
|
||||
import org.eclipse.leshan.core.request.UpdateRequest;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -65,8 +66,11 @@ public class LwM2MTestClient {
|
||||
private final ScheduledExecutorService executor;
|
||||
private final String endpoint;
|
||||
private LeshanClient client;
|
||||
private FwLwM2MDevice fwLwM2MDevice;
|
||||
private SwLwM2MDevice swLwM2MDevice;
|
||||
|
||||
public void init(Security security, NetworkConfig coapConfig) throws InvalidDDFFileException, IOException {
|
||||
Assert.assertNull("client already initialized", client);
|
||||
String[] resources = new String[]{"0.xml", "1.xml", "2.xml", "3.xml", "5.xml", "9.xml"};
|
||||
List<ObjectModel> models = new ArrayList<>();
|
||||
for (String resourceName : resources) {
|
||||
@ -77,8 +81,8 @@ public class LwM2MTestClient {
|
||||
initializer.setInstancesForObject(SECURITY, security);
|
||||
initializer.setInstancesForObject(SERVER, new Server(123, 300));
|
||||
initializer.setInstancesForObject(DEVICE, new SimpleLwM2MDevice());
|
||||
initializer.setInstancesForObject(FIRMWARE, new FwLwM2MDevice());
|
||||
initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, new SwLwM2MDevice());
|
||||
initializer.setInstancesForObject(FIRMWARE, fwLwM2MDevice = new FwLwM2MDevice());
|
||||
initializer.setInstancesForObject(SOFTWARE_MANAGEMENT, swLwM2MDevice = new SwLwM2MDevice());
|
||||
initializer.setClassForObject(LwM2mId.ACCESS_CONTROL, DummyInstanceEnabler.class);
|
||||
|
||||
DtlsConnectorConfig.Builder dtlsConfig = new DtlsConnectorConfig.Builder();
|
||||
@ -229,6 +233,12 @@ public class LwM2MTestClient {
|
||||
|
||||
public void destroy() {
|
||||
client.destroy(true);
|
||||
if (fwLwM2MDevice != null) {
|
||||
fwLwM2MDevice.destroy();
|
||||
}
|
||||
if (swLwM2MDevice != null) {
|
||||
swLwM2MDevice.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.eclipse.leshan.core.node.LwM2mResource;
|
||||
import org.eclipse.leshan.core.response.ExecuteResponse;
|
||||
import org.eclipse.leshan.core.response.ReadResponse;
|
||||
import org.eclipse.leshan.core.response.WriteResponse;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.security.auth.Destroyable;
|
||||
import java.util.Arrays;
|
||||
@ -38,7 +39,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
|
||||
|
||||
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9);
|
||||
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
|
||||
|
||||
private final AtomicInteger state = new AtomicInteger(0);
|
||||
|
||||
|
||||
@ -17,10 +17,12 @@ package org.thingsboard.server.util;
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.utils.EventDeduplicationExecutor;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -31,6 +33,16 @@ import java.util.function.Consumer;
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class EventDeduplicationExecutorTest {
|
||||
|
||||
ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName());
|
||||
ExecutorService executor;
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleFlowSameThread() throws InterruptedException {
|
||||
simpleFlow(MoreExecutors.newDirectExecutorService());
|
||||
@ -48,32 +60,38 @@ public class EventDeduplicationExecutorTest {
|
||||
|
||||
@Test
|
||||
public void testSimpleFlowSingleThread() throws InterruptedException {
|
||||
simpleFlow(Executors.newSingleThreadExecutor());
|
||||
executor = Executors.newSingleThreadExecutor(threadFactory);
|
||||
simpleFlow(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeriodicFlowSingleThread() throws InterruptedException {
|
||||
periodicFlow(Executors.newSingleThreadExecutor());
|
||||
executor = Executors.newSingleThreadExecutor(threadFactory);
|
||||
periodicFlow(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionFlowSingleThread() throws InterruptedException {
|
||||
exceptionFlow(Executors.newSingleThreadExecutor());
|
||||
executor = Executors.newSingleThreadExecutor(threadFactory);
|
||||
exceptionFlow(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleFlowMultiThread() throws InterruptedException {
|
||||
simpleFlow(Executors.newFixedThreadPool(3));
|
||||
executor = Executors.newFixedThreadPool(3, threadFactory);
|
||||
simpleFlow(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeriodicFlowMultiThread() throws InterruptedException {
|
||||
periodicFlow(Executors.newFixedThreadPool(3));
|
||||
executor = Executors.newFixedThreadPool(3, threadFactory);
|
||||
periodicFlow(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionFlowMultiThread() throws InterruptedException {
|
||||
exceptionFlow(Executors.newFixedThreadPool(3));
|
||||
executor = Executors.newFixedThreadPool(3, threadFactory);
|
||||
exceptionFlow(executor);
|
||||
}
|
||||
|
||||
private void simpleFlow(ExecutorService executorService) throws InterruptedException {
|
||||
|
||||
@ -22,6 +22,8 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -45,6 +47,7 @@ public class ActorSystemTest {
|
||||
|
||||
private volatile TbActorSystem actorSystem;
|
||||
private volatile ExecutorService submitPool;
|
||||
private ExecutorService executor;
|
||||
private int parallelism;
|
||||
|
||||
@Before
|
||||
@ -53,54 +56,64 @@ public class ActorSystemTest {
|
||||
parallelism = Math.max(2, cores / 2);
|
||||
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
|
||||
actorSystem = new DefaultTbActorSystem(settings);
|
||||
submitPool = Executors.newFixedThreadPool(parallelism); //order guaranteed
|
||||
submitPool = Executors.newFixedThreadPool(parallelism, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-submit-test-scope")); //order guaranteed
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdownActorSystem() {
|
||||
actorSystem.stop();
|
||||
submitPool.shutdownNow();
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test1actorsAnd100KMessages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(1, _100K, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test10actorsAnd100KMessages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(10, _100K, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor());
|
||||
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(_100K, 1, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd1Messages5times() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(_100K, 1, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd10Messages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(_100K, 10, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test1KActorsAnd1KMessages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
testActorsAndMessages(1000, 1000, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoMessagesAfterDestroy() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||
|
||||
@ -119,7 +132,8 @@ public class ActorSystemTest {
|
||||
|
||||
@Test
|
||||
public void testOneActorCreated() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
||||
@ -145,7 +159,8 @@ public class ActorSystemTest {
|
||||
|
||||
@Test
|
||||
public void testActorCreatorCalledOnce() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
ActorTestCtx testCtx = getActorTestCtx(1);
|
||||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
||||
final int actorsCount = 1000;
|
||||
@ -169,7 +184,8 @@ public class ActorSystemTest {
|
||||
|
||||
@Test
|
||||
public void testFailedInit() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
executor = ThingsBoardExecutors.newWorkStealingPool(parallelism, getClass());
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ import org.eclipse.californium.scandium.DTLSConnector;
|
||||
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
@ -118,7 +119,7 @@ public class DefaultCoapServerService implements CoapServerService {
|
||||
CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build();
|
||||
server.addEndpoint(dtlsCoapEndpoint);
|
||||
tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier();
|
||||
dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
Resource root = server.getRoot();
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.common;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
@ -41,7 +42,7 @@ public abstract class AbstractParallelTbQueueConsumerTemplate<R, T extends TbQue
|
||||
log.trace("Interrupted while waiting for consumer executor to stop");
|
||||
}
|
||||
}
|
||||
consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
|
||||
consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName(getClass().getSimpleName())));
|
||||
}
|
||||
|
||||
protected void shutdownExecutor() {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.eclipse.californium.core.Utils;
|
||||
import org.eclipse.californium.elements.DtlsEndpointContext;
|
||||
import org.eclipse.californium.elements.EndpointContext;
|
||||
import org.eclipse.californium.elements.exception.ConnectorException;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -31,7 +32,7 @@ import java.util.concurrent.Executors;
|
||||
|
||||
public class NoSecClient {
|
||||
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1, ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
private CoapClient coapClient;
|
||||
|
||||
public NoSecClient(String host, int port, String accessToken, String clientKeys, String sharedKeys) throws URISyntaxException {
|
||||
|
||||
@ -22,6 +22,7 @@ import org.eclipse.californium.core.CoapObserveRelation;
|
||||
import org.eclipse.californium.core.CoapResponse;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.Request;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
@ -36,7 +37,7 @@ public class NoSecObserveClient {
|
||||
|
||||
private CoapClient coapClient;
|
||||
private CoapObserveRelation observeRelation;
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1, ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
private CountDownLatch latch;
|
||||
|
||||
public NoSecObserveClient(String host, int port, String accessToken) throws URISyntaxException {
|
||||
|
||||
@ -27,6 +27,7 @@ import org.eclipse.californium.scandium.DTLSConnector;
|
||||
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
|
||||
import org.eclipse.californium.scandium.dtls.CertificateType;
|
||||
import org.eclipse.californium.scandium.dtls.x509.StaticNewAdvancedCertificateVerifier;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -41,7 +42,7 @@ import java.util.concurrent.Executors;
|
||||
public class SecureClientNoAuth {
|
||||
|
||||
private final DTLSConnector dtlsConnector;
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1, ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
private CoapClient coapClient;
|
||||
|
||||
public SecureClientNoAuth(DTLSConnector dtlsConnector, String host, int port, String accessToken, String clientKeys, String sharedKeys) throws URISyntaxException {
|
||||
|
||||
@ -27,6 +27,7 @@ import org.eclipse.californium.scandium.DTLSConnector;
|
||||
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
|
||||
import org.eclipse.californium.scandium.dtls.CertificateType;
|
||||
import org.eclipse.californium.scandium.dtls.x509.StaticNewAdvancedCertificateVerifier;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -41,7 +42,7 @@ import java.util.concurrent.Executors;
|
||||
public class SecureClientX509 {
|
||||
|
||||
private final DTLSConnector dtlsConnector;
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
private ExecutorService executor = Executors.newFixedThreadPool(1, ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
|
||||
private CoapClient coapClient;
|
||||
|
||||
public SecureClientX509(DTLSConnector dtlsConnector, String host, int port, String clientKeys, String sharedKeys) throws URISyntaxException {
|
||||
|
||||
@ -34,6 +34,7 @@ import org.snmp4j.transport.DefaultTcpTransportMapping;
|
||||
import org.snmp4j.transport.DefaultUdpTransportMapping;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.TbTransportService;
|
||||
@ -90,7 +91,7 @@ public class SnmpTransportService implements TbTransportService {
|
||||
@PostConstruct
|
||||
private void init() throws IOException {
|
||||
queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying"));
|
||||
responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel);
|
||||
responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing");
|
||||
|
||||
initializeSnmp();
|
||||
configureResponseDataMappers();
|
||||
@ -99,6 +100,16 @@ public class SnmpTransportService implements TbTransportService {
|
||||
log.info("SNMP transport service initialized");
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (queryingExecutor != null) {
|
||||
queryingExecutor.shutdownNow();
|
||||
}
|
||||
if (responseProcessingExecutor != null) {
|
||||
responseProcessingExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeSnmp() throws IOException {
|
||||
TransportMapping<?> transportMapping;
|
||||
switch (snmpUnderlyingProtocol) {
|
||||
|
||||
@ -24,6 +24,7 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileInfo;
|
||||
@ -158,7 +159,7 @@ public class BaseDeviceProfileServiceTest extends AbstractServiceTest {
|
||||
|
||||
@Test
|
||||
public void testFindOrCreateDeviceProfile() throws ExecutionException, InterruptedException {
|
||||
ListeningExecutorService testExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(100));
|
||||
ListeningExecutorService testExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(100, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")));
|
||||
try {
|
||||
List<ListenableFuture<DeviceProfile>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < 50; i++) {
|
||||
|
||||
@ -19,8 +19,10 @@ import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
@ -47,6 +49,15 @@ public class JpaDeviceDaoTest extends AbstractJpaDaoTest {
|
||||
@Autowired
|
||||
private DeviceDao deviceDao;
|
||||
|
||||
ListeningExecutorService executor;
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindDevicesByTenantId() {
|
||||
UUID tenantId1 = Uuids.timeBased();
|
||||
@ -77,8 +88,8 @@ public class JpaDeviceDaoTest extends AbstractJpaDaoTest {
|
||||
assertNotNull(entity);
|
||||
assertEquals(uuid, entity.getId().getId());
|
||||
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
|
||||
ListenableFuture<Device> future = service.submit(() -> deviceDao.findById(new TenantId(tenantId), uuid));
|
||||
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")));
|
||||
ListenableFuture<Device> future = executor.submit(() -> deviceDao.findById(new TenantId(tenantId), uuid));
|
||||
Device asyncDevice = future.get();
|
||||
assertNotNull("Async device expected to be not null", asyncDevice);
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ import org.junit.runner.Description;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.mqtt.MqttClient;
|
||||
import org.thingsboard.mqtt.MqttClientConfig;
|
||||
import org.thingsboard.mqtt.MqttHandler;
|
||||
@ -263,7 +264,7 @@ public class MqttClientTest extends AbstractContainerTest {
|
||||
JsonObject serverRpcPayload = new JsonObject();
|
||||
serverRpcPayload.addProperty("method", "getValue");
|
||||
serverRpcPayload.addProperty("params", true);
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())));
|
||||
ListenableFuture<ResponseEntity> future = service.submit(() -> {
|
||||
try {
|
||||
return restClient.getRestTemplate()
|
||||
@ -287,6 +288,7 @@ public class MqttClientTest extends AbstractContainerTest {
|
||||
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
|
||||
|
||||
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
|
||||
service.shutdownNow();
|
||||
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
|
||||
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.mqtt.MqttClient;
|
||||
import org.thingsboard.mqtt.MqttClientConfig;
|
||||
import org.thingsboard.mqtt.MqttHandler;
|
||||
@ -259,7 +260,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
|
||||
JsonObject serverRpcPayload = new JsonObject();
|
||||
serverRpcPayload.addProperty("method", "getValue");
|
||||
serverRpcPayload.addProperty("params", true);
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
|
||||
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())));
|
||||
ListenableFuture<ResponseEntity> future = service.submit(() -> {
|
||||
try {
|
||||
return restClient.getRestTemplate()
|
||||
@ -273,6 +274,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
|
||||
|
||||
// Wait for RPC call from the server and send the response
|
||||
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
|
||||
service.shutdownNow();
|
||||
|
||||
Assert.assertNotNull(requestFromServer);
|
||||
Assert.assertNotNull(requestFromServer.getMessage());
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user