moved jsinvoke.proto to queue, fixed js-executor, added createRemoteJsRequestTemplate to RuleEngine and Core factories

This commit is contained in:
YevhenBondarenko 2020-04-09 14:44:35 +03:00 committed by Andrew Shvayka
parent 7b3d475267
commit 1dd3334825
21 changed files with 353 additions and 76 deletions

View File

@ -21,14 +21,15 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
@ -41,28 +42,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true)
@ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')")
@Service
public class RemoteJsInvokeService extends AbstractJsInvokeService {
@Value("${js.remote.request_topic}")
private String requestTopic;
@Value("${js.remote.response_topic_prefix}")
private String responseTopicPrefix;
@Value("${js.remote.max_pending_requests}")
private long maxPendingRequests;
@Value("${js.remote.max_requests_timeout}")
private long maxRequestsTimeout;
@Value("${js.remote.response_poll_interval}")
private int responsePollDuration;
@Value("${js.remote.response_auto_commit_interval}")
private int autoCommitInterval;
@Getter
@Value("${js.remote.max_errors}")
private int maxErrors;
@ -94,43 +80,20 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
}
}
private DefaultTbQueueRequestTemplate<TbProtoQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> defaultTemplate;
@Autowired
private TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder();
// requestBuilder.settings(kafkaSettings);
// requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId());
// requestBuilder.defaultTopic(requestTopic);
// requestBuilder.encoder(new RemoteJsRequestEncoder());
TbQueueProducer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer;
// TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
// responseBuilder.settings(kafkaSettings);
// responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
// responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
// responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
// responseBuilder.autoCommit(true);
// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
// responseBuilder.decoder(new RemoteJsResponseDecoder());
// responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()));
//
// TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
// <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
// builder.requestTemplate(requestBuilder.build());
// builder.responseTemplate(responseBuilder.build());
// builder.maxPendingRequests(maxPendingRequests);
// builder.maxRequestTimeout(maxRequestsTimeout);
// builder.pollInterval(responsePollDuration);
// defaultTemplate = builder.build();
// defaultTemplate.init();
requestTemplate.init();
}
@PreDestroy
public void destroy() {
if (defaultTemplate != null) {
defaultTemplate.stop();
if (requestTemplate != null) {
requestTemplate.stop();
}
}
@ -147,7 +110,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.build();
log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
kafkaPushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
@ -199,7 +162,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setInvokeRequest(jsRequestBuilder.build())
.build();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
kafkaPushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
@Override
@ -239,7 +202,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setReleaseRequest(jsRequest)
.build();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
JsInvokeProtos.RemoteJsResponse response = future.get().getValue();
JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class TbProtoJsQueueMsg<T extends com.google.protobuf.GeneratedMessageV3> extends TbProtoQueueMsg<T> {
public TbProtoJsQueueMsg(UUID key, T value) {
super(key, value);
}
public TbProtoJsQueueMsg(UUID key, T value, TbQueueMsgHeaders headers) {
super(key, value, headers);
}
@Override
public byte[] getData() {
try {
return JsonFormat.printer().print(value).getBytes(StandardCharsets.UTF_8);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -25,7 +25,7 @@ import java.util.UUID;
public class TbProtoQueueMsg<T extends com.google.protobuf.GeneratedMessageV3> implements TbQueueMsg {
private final UUID key;
private final T value;
protected final T value;
private final TbQueueMsgHeaders headers;
public TbProtoQueueMsg(UUID key, T value) {

View File

@ -18,10 +18,13 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -125,4 +128,9 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getResponsesTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -28,6 +29,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -114,4 +117,9 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -25,6 +26,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -96,4 +99,9 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -27,6 +28,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
@ -110,4 +113,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new InMemoryTbQueueConsumer<>(ruleEngineSettings.getTopic() + ".notifications");
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -15,9 +15,12 @@
*/
package org.thingsboard.server.queue.provider;
import com.google.protobuf.util.JsonFormat;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -27,18 +30,25 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='monolith'")
public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
@ -50,13 +60,15 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
this.serviceInfoProvider = serviceInfoProvider;
@ -64,6 +76,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.jsInvokeSettings = jsInvokeSettings;
}
@Override
@ -175,4 +188,37 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
return requestBuilder.build();
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
// responseBuilder.autoCommit(true);
// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
}
);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
return builder.build();
}
}

View File

@ -15,9 +15,12 @@
*/
package org.thingsboard.server.queue.provider;
import com.google.protobuf.util.JsonFormat;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -27,16 +30,23 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'")
public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
@ -47,18 +57,21 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueTransportApiSettings transportApiSettings) {
TbQueueTransportApiSettings transportApiSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.jsInvokeSettings = jsInvokeSettings;
}
@Override
@ -148,4 +161,37 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
return requestBuilder.build();
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
// responseBuilder.autoCommit(true);
// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
}
);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
return builder.build();
}
}

View File

@ -15,9 +15,12 @@
*/
package org.thingsboard.server.queue.provider;
import com.google.protobuf.util.JsonFormat;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -25,16 +28,23 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotifica
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import java.nio.charset.StandardCharsets;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-rule-engine'")
public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
@ -44,16 +54,19 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
public KafkaTbRuleEngineQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings) {
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.jsInvokeSettings = jsInvokeSettings;
}
@Override
@ -124,4 +137,37 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
return consumerBuilder.build();
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
// responseBuilder.autoCommit(true);
// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
}
);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
return builder.build();
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -28,6 +29,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -133,4 +136,9 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getResponsesTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -27,6 +28,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
@ -110,4 +113,9 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -25,15 +26,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotifica
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@Component
@ -99,4 +102,9 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -28,9 +29,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -131,4 +134,9 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -27,9 +28,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -39,7 +42,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'")
public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory {
public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbServiceBusSettings serviceBusSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
@ -49,7 +52,7 @@ public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory {
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueAdmin admin;
public ServiceBusTbCoreQueueProvider(TbServiceBusSettings serviceBusSettings,
public ServiceBusTbCoreQueueFactory(TbServiceBusSettings serviceBusSettings,
TbQueueCoreSettings coreSettings,
TbQueueTransportApiSettings transportApiSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -113,4 +116,9 @@ public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -25,9 +26,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -96,4 +99,9 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null;
}
}

View File

@ -15,15 +15,19 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
/**
* Responsible for initialization of various Producers and Consumers used by TB Core Node.
@ -94,5 +98,5 @@ public interface TbCoreQueueFactory {
*/
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer();
TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate();
}

View File

@ -15,14 +15,17 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
/**
@ -82,4 +85,5 @@ public interface TbRuleEngineQueueFactory {
*/
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer();
TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate();
}

View File

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class TbQueueRemoteJsInvokeSettings {
@Value("${js.remote.request_topic}")
private String requestTopic;
@Value("${js.remote.response_topic_prefix}")
private String responseTopic;
@Value("${js.remote.max_pending_requests}")
private long maxPendingRequests;
@Value("${js.remote.response_poll_interval}")
private int responsePollInterval;
@Value("${js.remote.response_auto_commit_interval}")
private int autoCommitInterval;
@Value("${js.remote.max_requests_timeout}")
private long maxRequestsTimeout;
}

View File

@ -19,6 +19,7 @@ const COMPILATION_ERROR = 0;
const RUNTIME_ERROR = 1;
const TIMEOUT_ERROR = 2;
const UNRECOGNIZED = -1;
let headers;
const config = require('config'),
logger = require('../config/logger')._logger('JsInvokeMessageProcessor'),
@ -43,6 +44,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
var responseTopic;
try {
var request = JSON.parse(message.value.toString('utf8'));
headers = message.headers;
var buf = message.headers['requestId'];
requestId = Utils.UUIDFromBuffer(buf);
buf = message.headers['responseTopic'];
@ -148,7 +150,8 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT
messages: [
{
key: scriptId,
value: rawResponse
value: rawResponse,
headers: headers
}
]
}