created PubSubExecutor to use each time publisher is created and do not produce too much threads created

This commit is contained in:
dashevchenko 2023-11-14 17:36:51 +02:00
parent ef26f18d5a
commit 4f22ef9733
8 changed files with 67 additions and 3 deletions

View File

@ -29,6 +29,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.service.executors.PubSubExecutorService;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.SmsService;
@ -322,6 +323,10 @@ public class ActorSystemContext {
@Getter
private NotificationExecutorService notificationExecutor;
@Autowired
@Getter
private PubSubExecutorService pubSubExecutorService;
@Autowired
@Getter
private SharedEventLoopGroupService sharedEventLoopGroupService;

View File

@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.server.service.executors.PubSubExecutorService;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
@ -538,6 +539,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getNotificationExecutor();
}
@Override
public PubSubExecutorService getPubsubExecutor() {
return mainCtx.getPubSubExecutorService();
}
@Override
@Deprecated
public ScriptEngine createJsScriptEngine(String script, String... argNames) {

View File

@ -0,0 +1,26 @@
package org.thingsboard.server.service.executors;
import com.google.api.gax.core.FixedExecutorProvider;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.PubSubExecutor;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Component
public class PubSubExecutorService implements PubSubExecutor {
private static final int THREADS_PER_CPU = 5;
private FixedExecutorProvider fixedExecutorProvider;
@PostConstruct
public void init() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));;
this.fixedExecutorProvider = FixedExecutorProvider.create(scheduler);
}
public FixedExecutorProvider getExecutorProvider() {
return fixedExecutorProvider;
}
}

View File

@ -18,12 +18,14 @@ package org.thingsboard.server.queue.pubsub;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
@ -36,6 +38,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -50,11 +53,16 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
private static final int THREADS_PER_CPU = 5;
private final FixedExecutorProvider fixedExecutorProvider;
public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
this.defaultTopic = defaultTopic;
this.admin = admin;
this.pubSubSettings = pubSubSettings;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));;
fixedExecutorProvider = FixedExecutorProvider.create(scheduler);
}
@Override
@ -120,7 +128,10 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
try {
admin.createTopicIfNotExists(topic);
ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic);
Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build();
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(pubSubSettings.getCredentialsProvider())
.setExecutorProvider(fixedExecutorProvider)
.build();
publisherMap.put(topic, publisher);
return publisher;
} catch (IOException e) {

View File

@ -53,6 +53,10 @@
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>

View File

@ -0,0 +1,7 @@
package org.thingsboard.common.util;
import com.google.api.gax.core.FixedExecutorProvider;
public interface PubSubExecutor {
FixedExecutorProvider getExecutorProvider();
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.api;
import io.netty.channel.EventLoopGroup;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.common.util.PubSubExecutor;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.server.cluster.TbClusterService;
@ -318,6 +319,8 @@ public interface TbContext {
ListeningExecutor getNotificationExecutor();
PubSubExecutor getPubsubExecutor();
MailService getMailService(boolean isSystem);
SmsService getSmsService();

View File

@ -20,6 +20,7 @@ import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Publisher;
@ -68,7 +69,7 @@ public class TbPubSubNode extends TbAbstractExternalNode {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
try {
this.pubSubClient = initPubSubClient();
this.pubSubClient = initPubSubClient(ctx.getPubsubExecutor().getExecutorProvider());
} catch (Exception e) {
throw new TbNodeException(e);
}
@ -128,7 +129,7 @@ public class TbPubSubNode extends TbAbstractExternalNode {
return TbMsg.transformMsgMetadata(origMsg, metaData);
}
private Publisher initPubSubClient() throws IOException {
private Publisher initPubSubClient(FixedExecutorProvider fixedExecutorProvider) throws IOException {
ProjectTopicName topicName = ProjectTopicName.of(config.getProjectId(), config.getTopicName());
ServiceAccountCredentials credentials =
ServiceAccountCredentials.fromStream(
@ -148,6 +149,7 @@ public class TbPubSubNode extends TbAbstractExternalNode {
return Publisher.newBuilder(topicName)
.setCredentialsProvider(credProvider)
.setRetrySettings(retrySettings)
.setExecutorProvider(fixedExecutorProvider)
.build();
}
}