Merge remote-tracking branch 'upstream/develop/2.5.5' into fix/ttlCleanUpServices
This commit is contained in:
		
						commit
						d5146b0d92
					
				@ -197,19 +197,21 @@ public class TelemetryController extends BaseController {
 | 
				
			|||||||
    @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"})
 | 
					    @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"})
 | 
				
			||||||
    @ResponseBody
 | 
					    @ResponseBody
 | 
				
			||||||
    public DeferredResult<ResponseEntity> getTimeseries(
 | 
					    public DeferredResult<ResponseEntity> getTimeseries(
 | 
				
			||||||
            @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
 | 
					            @PathVariable("entityType") String entityType,
 | 
				
			||||||
 | 
					            @PathVariable("entityId") String entityIdStr,
 | 
				
			||||||
            @RequestParam(name = "keys") String keys,
 | 
					            @RequestParam(name = "keys") String keys,
 | 
				
			||||||
            @RequestParam(name = "startTs") Long startTs,
 | 
					            @RequestParam(name = "startTs") Long startTs,
 | 
				
			||||||
            @RequestParam(name = "endTs") Long endTs,
 | 
					            @RequestParam(name = "endTs") Long endTs,
 | 
				
			||||||
            @RequestParam(name = "interval", defaultValue = "0") Long interval,
 | 
					            @RequestParam(name = "interval", defaultValue = "0") Long interval,
 | 
				
			||||||
            @RequestParam(name = "limit", defaultValue = "100") Integer limit,
 | 
					            @RequestParam(name = "limit", defaultValue = "100") Integer limit,
 | 
				
			||||||
            @RequestParam(name = "agg", defaultValue = "NONE") String aggStr,
 | 
					            @RequestParam(name = "agg", defaultValue = "NONE") String aggStr,
 | 
				
			||||||
 | 
					            @RequestParam(name= "orderBy", defaultValue = "DESC") String orderBy,
 | 
				
			||||||
            @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
 | 
					            @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
 | 
				
			||||||
        return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
 | 
					        return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
 | 
				
			||||||
                (result, tenantId, entityId) -> {
 | 
					                (result, tenantId, entityId) -> {
 | 
				
			||||||
                    // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
 | 
					                    // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
 | 
				
			||||||
                    Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
 | 
					                    Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
 | 
				
			||||||
                    List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg))
 | 
					                    List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg, orderBy))
 | 
				
			||||||
                            .collect(Collectors.toList());
 | 
					                            .collect(Collectors.toList());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
 | 
					                    Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
 | 
				
			||||||
 | 
				
			|||||||
@ -29,6 +29,8 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
<!--    <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
 | 
					<!--    <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
 | 
				
			||||||
<!--    <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
 | 
					<!--    <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
 | 
				
			||||||
 | 
					<!--    <logger name="org.thingsboard.server.queue.memory.InMemoryStorage" level="DEBUG" />-->
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
 | 
					    <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -594,6 +594,10 @@ swagger:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
queue:
 | 
					queue:
 | 
				
			||||||
  type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
 | 
					  type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
 | 
				
			||||||
 | 
					  in_memory:
 | 
				
			||||||
 | 
					    stats:
 | 
				
			||||||
 | 
					      # For debug lvl
 | 
				
			||||||
 | 
					      print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
 | 
				
			||||||
  kafka:
 | 
					  kafka:
 | 
				
			||||||
    bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
 | 
					    bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
 | 
				
			||||||
    acks: "${TB_KAFKA_ACKS:all}"
 | 
					    acks: "${TB_KAFKA_ACKS:all}"
 | 
				
			||||||
@ -613,11 +617,11 @@ queue:
 | 
				
			|||||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
					      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
				
			||||||
    other:
 | 
					    other:
 | 
				
			||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
					      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
					    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
				
			||||||
 | 
				
			|||||||
@ -37,6 +37,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
				
			|||||||
    private final AdminClient client;
 | 
					    private final AdminClient client;
 | 
				
			||||||
    private final Map<String, String> topicConfigs;
 | 
					    private final Map<String, String> topicConfigs;
 | 
				
			||||||
    private final Set<String> topics = ConcurrentHashMap.newKeySet();
 | 
					    private final Set<String> topics = ConcurrentHashMap.newKeySet();
 | 
				
			||||||
 | 
					    private final int numPartitions;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final short replicationFactor;
 | 
					    private final short replicationFactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -50,6 +51,13 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
				
			|||||||
            log.error("Failed to get all topics.", e);
 | 
					            log.error("Failed to get all topics.", e);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        String numPartitionsStr = topicConfigs.get("partitions");
 | 
				
			||||||
 | 
					        if (numPartitionsStr != null) {
 | 
				
			||||||
 | 
					            numPartitions = Integer.parseInt(numPartitionsStr);
 | 
				
			||||||
 | 
					            topicConfigs.remove("partitions");
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            numPartitions = 1;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        replicationFactor = settings.getReplicationFactor();
 | 
					        replicationFactor = settings.getReplicationFactor();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -59,7 +67,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
				
			|||||||
            return;
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs);
 | 
					            NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs);
 | 
				
			||||||
            createTopic(newTopic).values().get(topic).get();
 | 
					            createTopic(newTopic).values().get(topic).get();
 | 
				
			||||||
            topics.add(topic);
 | 
					            topics.add(topic);
 | 
				
			||||||
        } catch (ExecutionException ee) {
 | 
					        } catch (ExecutionException ee) {
 | 
				
			||||||
 | 
				
			|||||||
@ -23,27 +23,21 @@ import java.util.Collections;
 | 
				
			|||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
import java.util.concurrent.BlockingQueue;
 | 
					import java.util.concurrent.BlockingQueue;
 | 
				
			||||||
import java.util.concurrent.ConcurrentHashMap;
 | 
					import java.util.concurrent.ConcurrentHashMap;
 | 
				
			||||||
import java.util.concurrent.Executors;
 | 
					 | 
				
			||||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
					import java.util.concurrent.LinkedBlockingQueue;
 | 
				
			||||||
import java.util.concurrent.ScheduledExecutorService;
 | 
					 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
public final class InMemoryStorage {
 | 
					public final class InMemoryStorage {
 | 
				
			||||||
    private static InMemoryStorage instance;
 | 
					    private static InMemoryStorage instance;
 | 
				
			||||||
    private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
 | 
					    private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
 | 
				
			||||||
    private static ScheduledExecutorService statExecutor;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private InMemoryStorage() {
 | 
					    private InMemoryStorage() {
 | 
				
			||||||
        storage = new ConcurrentHashMap<>();
 | 
					        storage = new ConcurrentHashMap<>();
 | 
				
			||||||
        statExecutor = Executors.newSingleThreadScheduledExecutor();
 | 
					 | 
				
			||||||
        statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void printStats() {
 | 
					    public void printStats() {
 | 
				
			||||||
        storage.forEach((topic, queue) -> {
 | 
					        storage.forEach((topic, queue) -> {
 | 
				
			||||||
            if (queue.size() > 0) {
 | 
					            if (queue.size() > 0) {
 | 
				
			||||||
                log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
 | 
					                log.debug("[{}] Queue Size [{}]", topic, queue.size());
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -90,9 +84,4 @@ public final class InMemoryStorage {
 | 
				
			|||||||
        storage.clear();
 | 
					        storage.clear();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void destroy() {
 | 
					 | 
				
			||||||
        if (statExecutor != null) {
 | 
					 | 
				
			||||||
            statExecutor.shutdownNow();
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void stop() {
 | 
					    public void stop() {
 | 
				
			||||||
        storage.destroy();
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
				
			||||||
 | 
					import org.springframework.scheduling.annotation.Scheduled;
 | 
				
			||||||
import org.springframework.stereotype.Component;
 | 
					import org.springframework.stereotype.Component;
 | 
				
			||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
					import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
				
			||||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
 | 
					import org.thingsboard.server.gen.js.JsInvokeProtos;
 | 
				
			||||||
@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
 | 
				
			|||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
					import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
				
			||||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
					import org.thingsboard.server.queue.discovery.PartitionService;
 | 
				
			||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
					import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
				
			||||||
 | 
					import org.thingsboard.server.queue.memory.InMemoryStorage;
 | 
				
			||||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
 | 
					import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
 | 
				
			||||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
 | 
					import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
 | 
				
			||||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
					import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
				
			||||||
@ -47,6 +49,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
				
			|||||||
    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
					    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
				
			||||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
					    private final TbQueueTransportApiSettings transportApiSettings;
 | 
				
			||||||
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
					    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
				
			||||||
 | 
					    private final InMemoryStorage storage;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
 | 
					    public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
 | 
				
			||||||
                                        TbQueueRuleEngineSettings ruleEngineSettings,
 | 
					                                        TbQueueRuleEngineSettings ruleEngineSettings,
 | 
				
			||||||
@ -59,6 +62,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
				
			|||||||
        this.ruleEngineSettings = ruleEngineSettings;
 | 
					        this.ruleEngineSettings = ruleEngineSettings;
 | 
				
			||||||
        this.transportApiSettings = transportApiSettings;
 | 
					        this.transportApiSettings = transportApiSettings;
 | 
				
			||||||
        this.transportNotificationSettings = transportNotificationSettings;
 | 
					        this.transportNotificationSettings = transportNotificationSettings;
 | 
				
			||||||
 | 
					        this.storage = InMemoryStorage.getInstance();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
@ -120,4 +124,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
				
			|||||||
    public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
 | 
					    public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
 | 
				
			||||||
        return null;
 | 
					        return null;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
 | 
				
			||||||
 | 
					    private void printInMemoryStats() {
 | 
				
			||||||
 | 
					        storage.printStats();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -1,2 +1,3 @@
 | 
				
			|||||||
TB_QUEUE_TYPE=kafka
 | 
					TB_QUEUE_TYPE=kafka
 | 
				
			||||||
TB_KAFKA_SERVERS=kafka:9092
 | 
					TB_KAFKA_SERVERS=kafka:9092
 | 
				
			||||||
 | 
					TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100
 | 
				
			||||||
 | 
				
			|||||||
@ -25,7 +25,7 @@ kafka:
 | 
				
			|||||||
    # Kafka Bootstrap Servers
 | 
					    # Kafka Bootstrap Servers
 | 
				
			||||||
    servers: "localhost:9092"
 | 
					    servers: "localhost:9092"
 | 
				
			||||||
  replication_factor: "1"
 | 
					  replication_factor: "1"
 | 
				
			||||||
  topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
 | 
					  topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100"
 | 
				
			||||||
  use_confluent_cloud: false
 | 
					  use_confluent_cloud: false
 | 
				
			||||||
  confluent:
 | 
					  confluent:
 | 
				
			||||||
    sasl:
 | 
					    sasl:
 | 
				
			||||||
 | 
				
			|||||||
@ -34,7 +34,7 @@ function KafkaProducer() {
 | 
				
			|||||||
    this.send = async (responseTopic, scriptId, rawResponse, headers) => {
 | 
					    this.send = async (responseTopic, scriptId, rawResponse, headers) => {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (!topics.includes(responseTopic)) {
 | 
					        if (!topics.includes(responseTopic)) {
 | 
				
			||||||
            let createResponseTopicResult = await createTopic(responseTopic);
 | 
					            let createResponseTopicResult = await createTopic(responseTopic, 1);
 | 
				
			||||||
            topics.push(responseTopic);
 | 
					            topics.push(responseTopic);
 | 
				
			||||||
            if (createResponseTopicResult) {
 | 
					            if (createResponseTopicResult) {
 | 
				
			||||||
                logger.info('Created new topic: %s', requestTopic);
 | 
					                logger.info('Created new topic: %s', requestTopic);
 | 
				
			||||||
@ -88,7 +88,18 @@ function KafkaProducer() {
 | 
				
			|||||||
        kafkaAdmin = kafkaClient.admin();
 | 
					        kafkaAdmin = kafkaClient.admin();
 | 
				
			||||||
        await kafkaAdmin.connect();
 | 
					        await kafkaAdmin.connect();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let createRequestTopicResult = await createTopic(requestTopic);
 | 
					        let partitions = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for (let i = 0; i < configEntries.length; i++) {
 | 
				
			||||||
 | 
					            let param = configEntries[i];
 | 
				
			||||||
 | 
					            if (param.name === 'partitions') {
 | 
				
			||||||
 | 
					                partitions = param.value;
 | 
				
			||||||
 | 
					                configEntries.splice(i, 1);
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let createRequestTopicResult = await createTopic(requestTopic, partitions);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (createRequestTopicResult) {
 | 
					        if (createRequestTopicResult) {
 | 
				
			||||||
            logger.info('Created new topic: %s', requestTopic);
 | 
					            logger.info('Created new topic: %s', requestTopic);
 | 
				
			||||||
@ -121,10 +132,11 @@ function KafkaProducer() {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
})();
 | 
					})();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
function createTopic(topic) {
 | 
					function createTopic(topic, partitions) {
 | 
				
			||||||
    return kafkaAdmin.createTopics({
 | 
					    return kafkaAdmin.createTopics({
 | 
				
			||||||
        topics: [{
 | 
					        topics: [{
 | 
				
			||||||
            topic: topic,
 | 
					            topic: topic,
 | 
				
			||||||
 | 
					            numPartitions: partitions,
 | 
				
			||||||
            replicationFactor: replicationFactor,
 | 
					            replicationFactor: replicationFactor,
 | 
				
			||||||
            configEntries: configEntries
 | 
					            configEntries: configEntries
 | 
				
			||||||
        }]
 | 
					        }]
 | 
				
			||||||
 | 
				
			|||||||
@ -77,11 +77,11 @@ queue:
 | 
				
			|||||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
					      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
				
			||||||
    other:
 | 
					    other:
 | 
				
			||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
					      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
					    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
				
			||||||
 | 
				
			|||||||
@ -70,11 +70,11 @@ queue:
 | 
				
			|||||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
					      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
				
			||||||
    other:
 | 
					    other:
 | 
				
			||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
					      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
					    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
				
			||||||
 | 
				
			|||||||
@ -98,11 +98,11 @@ queue:
 | 
				
			|||||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
					      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
				
			||||||
    other:
 | 
					    other:
 | 
				
			||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
				
			||||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
					      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
					    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user