diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f04cc4bc38..4f51cc7aed 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1270,8 +1270,8 @@ swagger: # Queue configuration parameters queue: - type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) - prefix: "${TB_QUEUE_PREFIX:myPrefix}" # Global queue prefix. If specified, prefix is added before default topic name: 'prefix.default_topic_name'. Prefix is applied to all topics (and consumer groups for kafka) except of js executor topics (please use REMOTE_JS_EVAL_REQUEST_TOPIC and REMOTE_JS_EVAL_RESPONSE_TOPIC to specify custom topic names) + type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) + prefix: "${TB_QUEUE_PREFIX:}" # Global queue prefix. If specified, prefix is added before default topic name: 'prefix.default_topic_name'. Prefix is applied to all topics (and consumer groups for kafka) except of js executor topics (please use REMOTE_JS_EVAL_REQUEST_TOPIC and REMOTE_JS_EVAL_RESPONSE_TOPIC to specify custom topic names) in_memory: stats: # For debug level diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index e81aaed39d..4b199e5654 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -50,6 +51,8 @@ public interface TbClusterService extends TbQueueClusterService { void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback); + void broadcastToCore(TransportProtos.ToCoreNotificationMsg msg); + void pushMsgToVersionControl(TenantId tenantId, ToVersionControlServiceMsg msg, TbQueueCallback callback); void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java index 25f3244079..15ff55972d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java @@ -24,6 +24,9 @@ import org.springframework.stereotype.Component; @Data @Component public class TbQueueRemoteJsInvokeSettings { + + @Value("${queue.prefix:}") + private String prefix; @Value("${queue.js.request_topic}") private String requestTopic; @@ -38,4 +41,12 @@ public class TbQueueRemoteJsInvokeSettings { @Value("${queue.js.max_requests_timeout}") private long maxRequestsTimeout; + + public String getRequestTopic(){ + return prefix.isBlank() ? requestTopic : prefix + "." + requestTopic; + } + + public String getResponseTopic(){ + return prefix.isBlank() ? responseTopic : prefix + "." + responseTopic; + } }