Merge pull request #6939 from thingsboard/kafka-responses-topic-single-partition
Single Partition for Kafka response topics. 10 partitions for transpo…
This commit is contained in:
		
						commit
						b47fc6aac3
					
				@ -943,7 +943,7 @@ queue:
 | 
				
			|||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
 | 
					      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
 | 
				
			||||||
      ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
					      ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
 | 
				
			|||||||
@ -51,7 +51,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
				
			|||||||
            log.error("Failed to get all topics.", e);
 | 
					            log.error("Failed to get all topics.", e);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        String numPartitionsStr = topicConfigs.get("partitions");
 | 
					        String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
 | 
				
			||||||
        if (numPartitionsStr != null) {
 | 
					        if (numPartitionsStr != null) {
 | 
				
			||||||
            numPartitions = Integer.parseInt(numPartitionsStr);
 | 
					            numPartitions = Integer.parseInt(numPartitionsStr);
 | 
				
			||||||
            topicConfigs.remove("partitions");
 | 
					            topicConfigs.remove("partitions");
 | 
				
			||||||
 | 
				
			|||||||
@ -28,6 +28,8 @@ import java.util.Map;
 | 
				
			|||||||
@Component
 | 
					@Component
 | 
				
			||||||
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
 | 
					@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
 | 
				
			||||||
public class TbKafkaTopicConfigs {
 | 
					public class TbKafkaTopicConfigs {
 | 
				
			||||||
 | 
					    public static final String NUM_PARTITIONS_SETTING = "partitions";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Value("${queue.kafka.topic-properties.core:}")
 | 
					    @Value("${queue.kafka.topic-properties.core:}")
 | 
				
			||||||
    private String coreProperties;
 | 
					    private String coreProperties;
 | 
				
			||||||
    @Value("${queue.kafka.topic-properties.rule-engine:}")
 | 
					    @Value("${queue.kafka.topic-properties.rule-engine:}")
 | 
				
			||||||
@ -43,17 +45,20 @@ public class TbKafkaTopicConfigs {
 | 
				
			|||||||
    @Value("${queue.kafka.topic-properties.version-control:}")
 | 
					    @Value("${queue.kafka.topic-properties.version-control:}")
 | 
				
			||||||
    private String vcProperties;
 | 
					    private String vcProperties;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> coreConfigs;
 | 
					    private Map<String, String> coreConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> ruleEngineConfigs;
 | 
					    private Map<String, String> ruleEngineConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> transportApiConfigs;
 | 
					    private Map<String, String> transportApiRequestConfigs;
 | 
				
			||||||
 | 
					    @Getter
 | 
				
			||||||
 | 
					    private Map<String, String> transportApiResponseConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> notificationsConfigs;
 | 
					    private Map<String, String> notificationsConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> jsExecutorConfigs;
 | 
					    private Map<String, String> jsExecutorRequestConfigs;
 | 
				
			||||||
 | 
					    @Getter
 | 
				
			||||||
 | 
					    private Map<String, String> jsExecutorResponseConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private Map<String, String> fwUpdatesConfigs;
 | 
					    private Map<String, String> fwUpdatesConfigs;
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
@ -63,9 +68,13 @@ public class TbKafkaTopicConfigs {
 | 
				
			|||||||
    private void init() {
 | 
					    private void init() {
 | 
				
			||||||
        coreConfigs = getConfigs(coreProperties);
 | 
					        coreConfigs = getConfigs(coreProperties);
 | 
				
			||||||
        ruleEngineConfigs = getConfigs(ruleEngineProperties);
 | 
					        ruleEngineConfigs = getConfigs(ruleEngineProperties);
 | 
				
			||||||
        transportApiConfigs = getConfigs(transportApiProperties);
 | 
					        transportApiRequestConfigs = getConfigs(transportApiProperties);
 | 
				
			||||||
 | 
					        transportApiResponseConfigs = getConfigs(transportApiProperties);
 | 
				
			||||||
 | 
					        transportApiResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
 | 
				
			||||||
        notificationsConfigs = getConfigs(notificationsProperties);
 | 
					        notificationsConfigs = getConfigs(notificationsProperties);
 | 
				
			||||||
        jsExecutorConfigs = getConfigs(jsExecutorProperties);
 | 
					        jsExecutorRequestConfigs = getConfigs(jsExecutorProperties);
 | 
				
			||||||
 | 
					        jsExecutorResponseConfigs = getConfigs(jsExecutorProperties);
 | 
				
			||||||
 | 
					        jsExecutorResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
 | 
				
			||||||
        fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
 | 
					        fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
 | 
				
			||||||
        vcConfigs = getConfigs(vcProperties);
 | 
					        vcConfigs = getConfigs(vcProperties);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -75,8 +75,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private final TbQueueAdmin coreAdmin;
 | 
					    private final TbQueueAdmin coreAdmin;
 | 
				
			||||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
					    private final TbQueueAdmin ruleEngineAdmin;
 | 
				
			||||||
    private final TbQueueAdmin jsExecutorAdmin;
 | 
					    private final TbQueueAdmin jsExecutorRequestAdmin;
 | 
				
			||||||
    private final TbQueueAdmin transportApiAdmin;
 | 
					    private final TbQueueAdmin jsExecutorResponseAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin transportApiRequestAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin transportApiResponseAdmin;
 | 
				
			||||||
    private final TbQueueAdmin notificationAdmin;
 | 
					    private final TbQueueAdmin notificationAdmin;
 | 
				
			||||||
    private final TbQueueAdmin fwUpdatesAdmin;
 | 
					    private final TbQueueAdmin fwUpdatesAdmin;
 | 
				
			||||||
    private final TbQueueAdmin vcAdmin;
 | 
					    private final TbQueueAdmin vcAdmin;
 | 
				
			||||||
@ -106,8 +108,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
					        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
				
			||||||
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
					        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
				
			||||||
        this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
 | 
					        this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
 | 
				
			||||||
        this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
 | 
					        this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
 | 
				
			||||||
 | 
					        this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
 | 
				
			||||||
 | 
					        this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
 | 
				
			||||||
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
					        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
				
			||||||
        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
					        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
				
			||||||
        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
 | 
					        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
 | 
				
			||||||
@ -237,7 +241,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
        consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
 | 
					        consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        consumerBuilder.groupId("monolith-transport-api-consumer");
 | 
					        consumerBuilder.groupId("monolith-transport-api-consumer");
 | 
				
			||||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
					        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
				
			||||||
        consumerBuilder.admin(transportApiAdmin);
 | 
					        consumerBuilder.admin(transportApiRequestAdmin);
 | 
				
			||||||
        consumerBuilder.statsService(consumerStatsService);
 | 
					        consumerBuilder.statsService(consumerStatsService);
 | 
				
			||||||
        return consumerBuilder.build();
 | 
					        return consumerBuilder.build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -248,7 +252,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
 | 
					        requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
 | 
				
			||||||
        requestBuilder.admin(transportApiAdmin);
 | 
					        requestBuilder.admin(transportApiResponseAdmin);
 | 
				
			||||||
        return requestBuilder.build();
 | 
					        return requestBuilder.build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -259,7 +263,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
					        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
				
			||||||
        requestBuilder.admin(jsExecutorAdmin);
 | 
					        requestBuilder.admin(jsExecutorRequestAdmin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
					        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
				
			||||||
        responseBuilder.settings(kafkaSettings);
 | 
					        responseBuilder.settings(kafkaSettings);
 | 
				
			||||||
@ -273,11 +277,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
        responseBuilder.statsService(consumerStatsService);
 | 
					        responseBuilder.statsService(consumerStatsService);
 | 
				
			||||||
        responseBuilder.admin(jsExecutorAdmin);
 | 
					        responseBuilder.admin(jsExecutorResponseAdmin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
					        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
				
			||||||
                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
					                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
				
			||||||
        builder.queueAdmin(jsExecutorAdmin);
 | 
					        builder.queueAdmin(jsExecutorResponseAdmin);
 | 
				
			||||||
        builder.requestTemplate(requestBuilder.build());
 | 
					        builder.requestTemplate(requestBuilder.build());
 | 
				
			||||||
        builder.responseTemplate(responseBuilder.build());
 | 
					        builder.responseTemplate(responseBuilder.build());
 | 
				
			||||||
        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
					        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
				
			||||||
@ -350,11 +354,17 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
				
			|||||||
        if (ruleEngineAdmin != null) {
 | 
					        if (ruleEngineAdmin != null) {
 | 
				
			||||||
            ruleEngineAdmin.destroy();
 | 
					            ruleEngineAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (jsExecutorAdmin != null) {
 | 
					        if (jsExecutorRequestAdmin != null) {
 | 
				
			||||||
            jsExecutorAdmin.destroy();
 | 
					            jsExecutorRequestAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (transportApiAdmin != null) {
 | 
					        if (jsExecutorResponseAdmin != null) {
 | 
				
			||||||
            transportApiAdmin.destroy();
 | 
					            jsExecutorResponseAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (transportApiRequestAdmin != null) {
 | 
				
			||||||
 | 
					            transportApiRequestAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (transportApiResponseAdmin != null) {
 | 
				
			||||||
 | 
					            transportApiResponseAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (notificationAdmin != null) {
 | 
					        if (notificationAdmin != null) {
 | 
				
			||||||
            notificationAdmin.destroy();
 | 
					            notificationAdmin.destroy();
 | 
				
			||||||
 | 
				
			|||||||
@ -73,8 +73,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private final TbQueueAdmin coreAdmin;
 | 
					    private final TbQueueAdmin coreAdmin;
 | 
				
			||||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
					    private final TbQueueAdmin ruleEngineAdmin;
 | 
				
			||||||
    private final TbQueueAdmin jsExecutorAdmin;
 | 
					    private final TbQueueAdmin jsExecutorRequestAdmin;
 | 
				
			||||||
    private final TbQueueAdmin transportApiAdmin;
 | 
					    private final TbQueueAdmin jsExecutorResponseAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin transportApiRequestAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin transportApiResponseAdmin;
 | 
				
			||||||
    private final TbQueueAdmin notificationAdmin;
 | 
					    private final TbQueueAdmin notificationAdmin;
 | 
				
			||||||
    private final TbQueueAdmin fwUpdatesAdmin;
 | 
					    private final TbQueueAdmin fwUpdatesAdmin;
 | 
				
			||||||
    private final TbQueueAdmin vcAdmin;
 | 
					    private final TbQueueAdmin vcAdmin;
 | 
				
			||||||
@ -103,8 +105,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
					        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
				
			||||||
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
					        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
				
			||||||
        this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
 | 
					        this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
 | 
				
			||||||
        this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
 | 
					        this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
 | 
				
			||||||
 | 
					        this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
 | 
				
			||||||
 | 
					        this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
 | 
				
			||||||
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
					        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
				
			||||||
        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
					        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
				
			||||||
        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
 | 
					        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
 | 
				
			||||||
@ -194,7 +198,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
        consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
 | 
					        consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        consumerBuilder.groupId("tb-core-transport-api-consumer");
 | 
					        consumerBuilder.groupId("tb-core-transport-api-consumer");
 | 
				
			||||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
					        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
				
			||||||
        consumerBuilder.admin(transportApiAdmin);
 | 
					        consumerBuilder.admin(transportApiRequestAdmin);
 | 
				
			||||||
        consumerBuilder.statsService(consumerStatsService);
 | 
					        consumerBuilder.statsService(consumerStatsService);
 | 
				
			||||||
        return consumerBuilder.build();
 | 
					        return consumerBuilder.build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -205,7 +209,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
 | 
					        requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
 | 
				
			||||||
        requestBuilder.admin(transportApiAdmin);
 | 
					        requestBuilder.admin(transportApiResponseAdmin);
 | 
				
			||||||
        return requestBuilder.build();
 | 
					        return requestBuilder.build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -216,7 +220,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
					        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
				
			||||||
        requestBuilder.admin(jsExecutorAdmin);
 | 
					        requestBuilder.admin(jsExecutorRequestAdmin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
					        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
				
			||||||
        responseBuilder.settings(kafkaSettings);
 | 
					        responseBuilder.settings(kafkaSettings);
 | 
				
			||||||
@ -229,12 +233,12 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
                    return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
 | 
					                    return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
        responseBuilder.admin(jsExecutorAdmin);
 | 
					        responseBuilder.admin(jsExecutorResponseAdmin);
 | 
				
			||||||
        responseBuilder.statsService(consumerStatsService);
 | 
					        responseBuilder.statsService(consumerStatsService);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
					        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
				
			||||||
                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
					                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
				
			||||||
        builder.queueAdmin(jsExecutorAdmin);
 | 
					        builder.queueAdmin(jsExecutorResponseAdmin);
 | 
				
			||||||
        builder.requestTemplate(requestBuilder.build());
 | 
					        builder.requestTemplate(requestBuilder.build());
 | 
				
			||||||
        builder.responseTemplate(responseBuilder.build());
 | 
					        builder.responseTemplate(responseBuilder.build());
 | 
				
			||||||
        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
					        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
				
			||||||
@ -307,11 +311,17 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
				
			|||||||
        if (ruleEngineAdmin != null) {
 | 
					        if (ruleEngineAdmin != null) {
 | 
				
			||||||
            ruleEngineAdmin.destroy();
 | 
					            ruleEngineAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (jsExecutorAdmin != null) {
 | 
					        if (jsExecutorRequestAdmin != null) {
 | 
				
			||||||
            jsExecutorAdmin.destroy();
 | 
					            jsExecutorRequestAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (transportApiAdmin != null) {
 | 
					        if (jsExecutorResponseAdmin != null) {
 | 
				
			||||||
            transportApiAdmin.destroy();
 | 
					            jsExecutorResponseAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (transportApiRequestAdmin != null) {
 | 
				
			||||||
 | 
					            transportApiRequestAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (transportApiResponseAdmin != null) {
 | 
				
			||||||
 | 
					            transportApiResponseAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (notificationAdmin != null) {
 | 
					        if (notificationAdmin != null) {
 | 
				
			||||||
            notificationAdmin.destroy();
 | 
					            notificationAdmin.destroy();
 | 
				
			||||||
 | 
				
			|||||||
@ -68,7 +68,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private final TbQueueAdmin coreAdmin;
 | 
					    private final TbQueueAdmin coreAdmin;
 | 
				
			||||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
					    private final TbQueueAdmin ruleEngineAdmin;
 | 
				
			||||||
    private final TbQueueAdmin jsExecutorAdmin;
 | 
					    private final TbQueueAdmin jsExecutorRequestAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin jsExecutorResponseAdmin;
 | 
				
			||||||
    private final TbQueueAdmin notificationAdmin;
 | 
					    private final TbQueueAdmin notificationAdmin;
 | 
				
			||||||
    private final TbQueueAdmin fwUpdatesAdmin;
 | 
					    private final TbQueueAdmin fwUpdatesAdmin;
 | 
				
			||||||
    private final AtomicLong consumerCount = new AtomicLong();
 | 
					    private final AtomicLong consumerCount = new AtomicLong();
 | 
				
			||||||
@ -92,7 +93,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
					        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
				
			||||||
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
					        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
				
			||||||
        this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
 | 
					        this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
 | 
				
			||||||
 | 
					        this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
 | 
				
			||||||
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
					        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
				
			||||||
        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
					        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -191,7 +193,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
					        requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
 | 
				
			||||||
        requestBuilder.admin(jsExecutorAdmin);
 | 
					        requestBuilder.admin(jsExecutorRequestAdmin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
					        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
				
			||||||
        responseBuilder.settings(kafkaSettings);
 | 
					        responseBuilder.settings(kafkaSettings);
 | 
				
			||||||
@ -204,12 +206,12 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
				
			|||||||
                    return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
 | 
					                    return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
        responseBuilder.admin(jsExecutorAdmin);
 | 
					        responseBuilder.admin(jsExecutorResponseAdmin);
 | 
				
			||||||
        responseBuilder.statsService(consumerStatsService);
 | 
					        responseBuilder.statsService(consumerStatsService);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
					        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
				
			||||||
                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
					                <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
 | 
				
			||||||
        builder.queueAdmin(jsExecutorAdmin);
 | 
					        builder.queueAdmin(jsExecutorResponseAdmin);
 | 
				
			||||||
        builder.requestTemplate(requestBuilder.build());
 | 
					        builder.requestTemplate(requestBuilder.build());
 | 
				
			||||||
        builder.responseTemplate(responseBuilder.build());
 | 
					        builder.responseTemplate(responseBuilder.build());
 | 
				
			||||||
        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
					        builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
 | 
				
			||||||
@ -236,8 +238,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
				
			|||||||
        if (ruleEngineAdmin != null) {
 | 
					        if (ruleEngineAdmin != null) {
 | 
				
			||||||
            ruleEngineAdmin.destroy();
 | 
					            ruleEngineAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (jsExecutorAdmin != null) {
 | 
					        if (jsExecutorRequestAdmin != null) {
 | 
				
			||||||
            jsExecutorAdmin.destroy();
 | 
					            jsExecutorRequestAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (jsExecutorResponseAdmin != null) {
 | 
				
			||||||
 | 
					            jsExecutorResponseAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (notificationAdmin != null) {
 | 
					        if (notificationAdmin != null) {
 | 
				
			||||||
            notificationAdmin.destroy();
 | 
					            notificationAdmin.destroy();
 | 
				
			||||||
 | 
				
			|||||||
@ -59,7 +59,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private final TbQueueAdmin coreAdmin;
 | 
					    private final TbQueueAdmin coreAdmin;
 | 
				
			||||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
					    private final TbQueueAdmin ruleEngineAdmin;
 | 
				
			||||||
    private final TbQueueAdmin transportApiAdmin;
 | 
					    private final TbQueueAdmin transportApiRequestAdmin;
 | 
				
			||||||
 | 
					    private final TbQueueAdmin transportApiResponseAdmin;
 | 
				
			||||||
    private final TbQueueAdmin notificationAdmin;
 | 
					    private final TbQueueAdmin notificationAdmin;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public KafkaTbTransportQueueFactory(TbKafkaSettings kafkaSettings,
 | 
					    public KafkaTbTransportQueueFactory(TbKafkaSettings kafkaSettings,
 | 
				
			||||||
@ -80,7 +81,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
					        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
 | 
				
			||||||
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
					        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
 | 
				
			||||||
        this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
 | 
					        this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
 | 
				
			||||||
 | 
					        this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
 | 
				
			||||||
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
					        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -90,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
 | 
				
			|||||||
        requestBuilder.settings(kafkaSettings);
 | 
					        requestBuilder.settings(kafkaSettings);
 | 
				
			||||||
        requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId());
 | 
					        requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
 | 
					        requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
 | 
				
			||||||
        requestBuilder.admin(transportApiAdmin);
 | 
					        requestBuilder.admin(transportApiRequestAdmin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
					        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TbKafkaConsumerTemplate.builder();
 | 
				
			||||||
        responseBuilder.settings(kafkaSettings);
 | 
					        responseBuilder.settings(kafkaSettings);
 | 
				
			||||||
@ -98,12 +100,12 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
 | 
				
			|||||||
        responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
 | 
					        responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
 | 
					        responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
 | 
				
			||||||
        responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
					        responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
				
			||||||
        responseBuilder.admin(transportApiAdmin);
 | 
					        responseBuilder.admin(transportApiResponseAdmin);
 | 
				
			||||||
        responseBuilder.statsService(consumerStatsService);
 | 
					        responseBuilder.statsService(consumerStatsService);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
					        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
				
			||||||
                <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
 | 
					                <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
 | 
				
			||||||
        templateBuilder.queueAdmin(transportApiAdmin);
 | 
					        templateBuilder.queueAdmin(transportApiResponseAdmin);
 | 
				
			||||||
        templateBuilder.requestTemplate(requestBuilder.build());
 | 
					        templateBuilder.requestTemplate(requestBuilder.build());
 | 
				
			||||||
        templateBuilder.responseTemplate(responseBuilder.build());
 | 
					        templateBuilder.responseTemplate(responseBuilder.build());
 | 
				
			||||||
        templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
 | 
					        templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
 | 
				
			||||||
@ -163,8 +165,11 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
 | 
				
			|||||||
        if (ruleEngineAdmin != null) {
 | 
					        if (ruleEngineAdmin != null) {
 | 
				
			||||||
            ruleEngineAdmin.destroy();
 | 
					            ruleEngineAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (transportApiAdmin != null) {
 | 
					        if (transportApiRequestAdmin != null) {
 | 
				
			||||||
            transportApiAdmin.destroy();
 | 
					            transportApiRequestAdmin.destroy();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (transportApiResponseAdmin != null) {
 | 
				
			||||||
 | 
					            transportApiResponseAdmin.destroy();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (notificationAdmin != null) {
 | 
					        if (notificationAdmin != null) {
 | 
				
			||||||
            notificationAdmin.destroy();
 | 
					            notificationAdmin.destroy();
 | 
				
			||||||
 | 
				
			|||||||
@ -172,7 +172,7 @@ queue:
 | 
				
			|||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
 | 
				
			|||||||
@ -159,7 +159,7 @@ queue:
 | 
				
			|||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
 | 
				
			|||||||
@ -235,10 +235,10 @@ queue:
 | 
				
			|||||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
					      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
				
			||||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
					        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
				
			||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
					    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
				
			||||||
 | 
				
			|||||||
@ -189,7 +189,7 @@ queue:
 | 
				
			|||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
  aws_sqs:
 | 
					  aws_sqs:
 | 
				
			||||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
					    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
				
			||||||
 | 
				
			|||||||
@ -135,7 +135,7 @@ queue:
 | 
				
			|||||||
    topic-properties:
 | 
					    topic-properties:
 | 
				
			||||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
				
			||||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
					      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
				
			||||||
    consumer-stats:
 | 
					    consumer-stats:
 | 
				
			||||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
					      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user