diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 3d4340f6c9..b9f539c877 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,6 +29,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.service.executors.PubSubExecutorService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -322,6 +323,10 @@ public class ActorSystemContext { @Getter private NotificationExecutorService notificationExecutor; + @Autowired + @Getter + private PubSubExecutorService pubSubExecutorService; + @Autowired @Getter private SharedEventLoopGroupService sharedEventLoopGroupService; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ca1d174322..299b6274b0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.server.service.executors.PubSubExecutorService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; @@ -538,6 +539,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getNotificationExecutor(); } + @Override + public PubSubExecutorService getPubsubExecutor() { + return mainCtx.getPubSubExecutorService(); + } + @Override @Deprecated public ScriptEngine createJsScriptEngine(String script, String... argNames) { diff --git a/application/src/main/java/org/thingsboard/server/service/executors/PubSubExecutorService.java b/application/src/main/java/org/thingsboard/server/service/executors/PubSubExecutorService.java new file mode 100644 index 0000000000..22d55cc673 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/executors/PubSubExecutorService.java @@ -0,0 +1,26 @@ +package org.thingsboard.server.service.executors; + +import com.google.api.gax.core.FixedExecutorProvider; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.PubSubExecutor; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import javax.annotation.PostConstruct; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Component +public class PubSubExecutorService implements PubSubExecutor { + private static final int THREADS_PER_CPU = 5; + private FixedExecutorProvider fixedExecutorProvider; + + @PostConstruct + public void init() { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));; + this.fixedExecutorProvider = FixedExecutorProvider.create(scheduler); + } + + public FixedExecutorProvider getExecutorProvider() { + return fixedExecutorProvider; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java index 0d84a8f839..46c6bb6413 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java @@ -18,12 +18,14 @@ package org.thingsboard.server.queue.pubsub; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.gson.Gson; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueCallback; @@ -36,6 +38,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j @@ -50,11 +53,16 @@ public class TbPubSubProducerTemplate implements TbQueuePr private final Map publisherMap = new ConcurrentHashMap<>(); private final ExecutorService pubExecutor = Executors.newCachedThreadPool(); + private static final int THREADS_PER_CPU = 5; + private final FixedExecutorProvider fixedExecutorProvider; public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) { this.defaultTopic = defaultTopic; this.admin = admin; this.pubSubSettings = pubSubSettings; + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));; + fixedExecutorProvider = FixedExecutorProvider.create(scheduler); } @Override @@ -120,7 +128,10 @@ public class TbPubSubProducerTemplate implements TbQueuePr try { admin.createTopicIfNotExists(topic); ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic); - Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); + Publisher publisher = Publisher.newBuilder(topicName) + .setCredentialsProvider(pubSubSettings.getCredentialsProvider()) + .setExecutorProvider(fixedExecutorProvider) + .build(); publisherMap.put(topic, publisher); return publisher; } catch (IOException e) { diff --git a/common/util/pom.xml b/common/util/pom.xml index eab8ae982b..c2f5dfea6c 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -53,6 +53,10 @@ guava provided + + com.google.cloud + google-cloud-pubsub + javax.annotation javax.annotation-api diff --git a/common/util/src/main/java/org/thingsboard/common/util/PubSubExecutor.java b/common/util/src/main/java/org/thingsboard/common/util/PubSubExecutor.java new file mode 100644 index 0000000000..5354102578 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/PubSubExecutor.java @@ -0,0 +1,7 @@ +package org.thingsboard.common.util; + +import com.google.api.gax.core.FixedExecutorProvider; + +public interface PubSubExecutor { + FixedExecutorProvider getExecutorProvider(); +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index e645110a9d..cd71626afd 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.api; import io.netty.channel.EventLoopGroup; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.common.util.PubSubExecutor; import org.thingsboard.rule.engine.api.slack.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; import org.thingsboard.server.cluster.TbClusterService; @@ -318,6 +319,8 @@ public interface TbContext { ListeningExecutor getNotificationExecutor(); + PubSubExecutor getPubsubExecutor(); + MailService getMailService(boolean isSystem); SmsService getSmsService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index c55113783c..4c24bdf1cb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.pubsub.v1.Publisher; @@ -68,7 +69,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); try { - this.pubSubClient = initPubSubClient(); + this.pubSubClient = initPubSubClient(ctx.getPubsubExecutor().getExecutorProvider()); } catch (Exception e) { throw new TbNodeException(e); } @@ -128,7 +129,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { return TbMsg.transformMsgMetadata(origMsg, metaData); } - private Publisher initPubSubClient() throws IOException { + private Publisher initPubSubClient(FixedExecutorProvider fixedExecutorProvider) throws IOException { ProjectTopicName topicName = ProjectTopicName.of(config.getProjectId(), config.getTopicName()); ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream( @@ -148,6 +149,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { return Publisher.newBuilder(topicName) .setCredentialsProvider(credProvider) .setRetrySettings(retrySettings) + .setExecutorProvider(fixedExecutorProvider) .build(); } }