Add custom topic properties configuration
This commit is contained in:
		
							parent
							
								
									137cc18099
								
							
						
					
					
						commit
						5862b417aa
					
				@ -96,7 +96,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
 | 
			
		||||
    private void onQueueCreated(Queue queue) {
 | 
			
		||||
        for (int i = 0; i < queue.getPartitions(); i++) {
 | 
			
		||||
            tbQueueAdmin.createTopicIfNotExists(
 | 
			
		||||
                    new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
                    new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
 | 
			
		||||
                    queue.getCustomProperties()
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tbClusterService.onQueueChange(queue);
 | 
			
		||||
@ -111,7 +113,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
 | 
			
		||||
                log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName());
 | 
			
		||||
                for (int i = oldPartitions; i < currentPartitions; i++) {
 | 
			
		||||
                    tbQueueAdmin.createTopicIfNotExists(
 | 
			
		||||
                            new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
 | 
			
		||||
                            new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
 | 
			
		||||
                            queue.getCustomProperties()
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                tbClusterService.onQueueChange(queue);
 | 
			
		||||
            } else {
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,11 @@ package org.thingsboard.server.queue;
 | 
			
		||||
 | 
			
		||||
public interface TbQueueAdmin {
 | 
			
		||||
 | 
			
		||||
    void createTopicIfNotExists(String topic);
 | 
			
		||||
    default void createTopicIfNotExists(String topic) {
 | 
			
		||||
        createTopicIfNotExists(topic, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void createTopicIfNotExists(String topic, String properties);
 | 
			
		||||
 | 
			
		||||
    void destroy();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,8 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.queue;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonIgnore;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.HasName;
 | 
			
		||||
import org.thingsboard.server.common.data.HasTenantId;
 | 
			
		||||
@ -25,6 +27,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi
 | 
			
		||||
import org.thingsboard.server.common.data.validation.Length;
 | 
			
		||||
import org.thingsboard.server.common.data.validation.NoXss;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class Queue extends SearchTextBasedWithAdditionalInfo<QueueId> implements HasName, HasTenantId {
 | 
			
		||||
    private TenantId tenantId;
 | 
			
		||||
@ -65,4 +69,12 @@ public class Queue extends SearchTextBasedWithAdditionalInfo<QueueId> implements
 | 
			
		||||
    public String getSearchText() {
 | 
			
		||||
        return getName();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    public String getCustomProperties() {
 | 
			
		||||
        return Optional.ofNullable(getAdditionalInfo())
 | 
			
		||||
                .map(info -> info.get("customProperties"))
 | 
			
		||||
                .filter(JsonNode::isTextual).map(JsonNode::asText).orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -99,7 +99,7 @@ public class RuleEngineTbQueueAdminFactory {
 | 
			
		||||
        return new TbQueueAdmin() {
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void createTopicIfNotExists(String topic) {
 | 
			
		||||
            public void createTopicIfNotExists(String topic, String properties) {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsExc
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
@ -60,7 +61,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String topic) {
 | 
			
		||||
    public void createTopicIfNotExists(String topic, String properties) {
 | 
			
		||||
        if (queues.contains(topic)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
@ -68,7 +69,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
 | 
			
		||||
        try {
 | 
			
		||||
            QueueDescription queueDescription = new QueueDescription(topic);
 | 
			
		||||
            queueDescription.setRequiresDuplicateDetection(false);
 | 
			
		||||
            setQueueConfigs(queueDescription);
 | 
			
		||||
            setQueueConfigs(queueDescription, PropertyUtils.getProps(queueConfigs, properties));
 | 
			
		||||
 | 
			
		||||
            client.createQueue(queueDescription);
 | 
			
		||||
            queues.add(topic);
 | 
			
		||||
@ -107,7 +108,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void setQueueConfigs(QueueDescription queueDescription) {
 | 
			
		||||
    private void setQueueConfigs(QueueDescription queueDescription, Map<String, String> queueConfigs) {
 | 
			
		||||
        queueConfigs.forEach((confKey, confValue) -> {
 | 
			
		||||
            switch (confKey) {
 | 
			
		||||
                case MAX_SIZE:
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult;
 | 
			
		||||
import org.apache.kafka.clients.admin.NewTopic;
 | 
			
		||||
import org.apache.kafka.common.errors.TopicExistsException;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
@ -62,12 +63,12 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String topic) {
 | 
			
		||||
    public void createTopicIfNotExists(String topic, String properties) {
 | 
			
		||||
        if (topics.contains(topic)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs);
 | 
			
		||||
            NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(PropertyUtils.getProps(topicConfigs, properties));
 | 
			
		||||
            createTopic(newTopic).values().get(topic).get();
 | 
			
		||||
            topics.add(topic);
 | 
			
		||||
        } catch (ExecutionException ee) {
 | 
			
		||||
@ -81,7 +82,6 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
            log.warn("[{}] Failed to create topic", topic, e);
 | 
			
		||||
            throw new RuntimeException(e);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -73,7 +73,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
 | 
			
		||||
 | 
			
		||||
        templateBuilder.queueAdmin(new TbQueueAdmin() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void createTopicIfNotExists(String topic) {}
 | 
			
		||||
            public void createTopicIfNotExists(String topic, String properties) {}
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void destroy() {}
 | 
			
		||||
 | 
			
		||||
@ -103,7 +103,7 @@ public class TbPubSubAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String partition) {
 | 
			
		||||
    public void createTopicIfNotExists(String partition, String properties) {
 | 
			
		||||
        TopicName topicName = TopicName.newBuilder()
 | 
			
		||||
                .setTopic(partition)
 | 
			
		||||
                .setProject(pubSubSettings.getProjectId())
 | 
			
		||||
 | 
			
		||||
@ -18,9 +18,11 @@ package org.thingsboard.server.queue.rabbitmq;
 | 
			
		||||
import com.rabbitmq.client.Channel;
 | 
			
		||||
import com.rabbitmq.client.Connection;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
 | 
			
		||||
@ -50,7 +52,12 @@ public class TbRabbitMqAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String topic) {
 | 
			
		||||
    public void createTopicIfNotExists(String topic, String properties) {
 | 
			
		||||
        Map<String, Object> arguments = this.arguments;
 | 
			
		||||
        if (StringUtils.isNotBlank(properties)) {
 | 
			
		||||
            arguments = new HashMap<>(arguments);
 | 
			
		||||
            arguments.putAll(TbRabbitMqQueueArguments.getArgs(properties));
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            channel.queueDeclare(topic, false, false, false, arguments);
 | 
			
		||||
        } catch (IOException e) {
 | 
			
		||||
 | 
			
		||||
@ -65,7 +65,7 @@ public class TbRabbitMqQueueArguments {
 | 
			
		||||
        vcArgs = getArgs(vcProperties);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<String, Object> getArgs(String properties) {
 | 
			
		||||
    public static Map<String, Object> getArgs(String properties) {
 | 
			
		||||
        Map<String, Object> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
@ -78,7 +78,7 @@ public class TbRabbitMqQueueArguments {
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Object getObjectValue(String str) {
 | 
			
		||||
    private static Object getObjectValue(String str) {
 | 
			
		||||
        if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) {
 | 
			
		||||
            return Boolean.valueOf(str);
 | 
			
		||||
        } else if (isNumeric(str)) {
 | 
			
		||||
@ -87,7 +87,7 @@ public class TbRabbitMqQueueArguments {
 | 
			
		||||
        return str;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Object getNumericValue(String str) {
 | 
			
		||||
    private static Object getNumericValue(String str) {
 | 
			
		||||
        if (str.contains(".")) {
 | 
			
		||||
            return Double.valueOf(str);
 | 
			
		||||
        } else {
 | 
			
		||||
@ -97,7 +97,7 @@ public class TbRabbitMqQueueArguments {
 | 
			
		||||
 | 
			
		||||
    private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?");
 | 
			
		||||
 | 
			
		||||
    public boolean isNumeric(String strNum) {
 | 
			
		||||
    private static boolean isNumeric(String strNum) {
 | 
			
		||||
        if (strNum == null) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ import com.amazonaws.services.sqs.model.CreateQueueRequest;
 | 
			
		||||
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
@ -63,11 +64,12 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String topic) {
 | 
			
		||||
    public void createTopicIfNotExists(String topic, String properties) {
 | 
			
		||||
        String queueName = convertTopicToQueueName(topic);
 | 
			
		||||
        if (queues.containsKey(queueName)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        Map<String, String> attributes = PropertyUtils.getProps(this.attributes, properties, TbAwsSqsQueueAttributes::toConfigs);
 | 
			
		||||
        final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes);
 | 
			
		||||
        String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl();
 | 
			
		||||
        queues.put(getQueueNameFromUrl(queueUrl), queueUrl);
 | 
			
		||||
 | 
			
		||||
@ -76,6 +76,12 @@ public class TbAwsSqsQueueAttributes {
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> getConfigs(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>(defaultAttributes);
 | 
			
		||||
       configs.putAll(toConfigs(properties));
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Map<String, String> toConfigs(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                int delimiterPosition = property.indexOf(":");
 | 
			
		||||
@ -88,7 +94,7 @@ public class TbAwsSqsQueueAttributes {
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void validateAttributeName(String key) {
 | 
			
		||||
    private static void validateAttributeName(String key) {
 | 
			
		||||
        QueueAttributeName.fromValue(key);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
 | 
			
		||||
public class PropertyUtils {
 | 
			
		||||
 | 
			
		||||
@ -37,4 +38,17 @@ public class PropertyUtils {
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr) {
 | 
			
		||||
        return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr, Function<String, Map<String, String>> parser) {
 | 
			
		||||
        Map<String, String> properties = defaultProperties;
 | 
			
		||||
        if (StringUtils.isNotBlank(propertiesStr)) {
 | 
			
		||||
            properties = new HashMap<>(properties);
 | 
			
		||||
            properties.putAll(parser.apply(propertiesStr));
 | 
			
		||||
        }
 | 
			
		||||
        return properties;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -173,7 +173,8 @@ export class TenantProfileQueuesComponent implements ControlValueAccessor, Valid
 | 
			
		||||
      },
 | 
			
		||||
      topic: '',
 | 
			
		||||
      additionalInfo: {
 | 
			
		||||
        description: ''
 | 
			
		||||
        description: '',
 | 
			
		||||
        customProperties: ''
 | 
			
		||||
      }
 | 
			
		||||
    };
 | 
			
		||||
    this.idMap.push(queue.id);
 | 
			
		||||
 | 
			
		||||
@ -74,7 +74,8 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
 | 
			
		||||
        },
 | 
			
		||||
        topic: 'tb_rule_engine.main',
 | 
			
		||||
        additionalInfo: {
 | 
			
		||||
          description: ''
 | 
			
		||||
          description: '',
 | 
			
		||||
          customProperties: ''
 | 
			
		||||
        }
 | 
			
		||||
      },
 | 
			
		||||
      {
 | 
			
		||||
@ -97,7 +98,8 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
 | 
			
		||||
          maxPauseBetweenRetries: 5
 | 
			
		||||
        },
 | 
			
		||||
        additionalInfo: {
 | 
			
		||||
          description: ''
 | 
			
		||||
          description: '',
 | 
			
		||||
          customProperties: ''
 | 
			
		||||
        }
 | 
			
		||||
      },
 | 
			
		||||
      {
 | 
			
		||||
@ -120,7 +122,8 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
 | 
			
		||||
          maxPauseBetweenRetries: 5
 | 
			
		||||
        },
 | 
			
		||||
        additionalInfo: {
 | 
			
		||||
          description: ''
 | 
			
		||||
          description: '',
 | 
			
		||||
          customProperties: ''
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    ];
 | 
			
		||||
 | 
			
		||||
@ -203,6 +203,11 @@
 | 
			
		||||
      </ng-template>
 | 
			
		||||
    </mat-expansion-panel>
 | 
			
		||||
  </mat-accordion>
 | 
			
		||||
  <mat-form-field class="mat-block" formGroupName="additionalInfo" appearance="fill">
 | 
			
		||||
    <mat-label translate>queue.custom-properties</mat-label>
 | 
			
		||||
    <textarea matInput formControlName="customProperties" cdkTextareaAutosize cdkAutosizeMinRows="1"></textarea>
 | 
			
		||||
    <mat-hint translate>queue.custom-properties-hint</mat-hint>
 | 
			
		||||
  </mat-form-field>
 | 
			
		||||
  <mat-form-field class="mat-block" formGroupName="additionalInfo" appearance="fill">
 | 
			
		||||
    <mat-label translate>queue.description</mat-label>
 | 
			
		||||
    <textarea matInput formControlName="description" rows="2"></textarea>
 | 
			
		||||
 | 
			
		||||
@ -117,7 +117,8 @@ export class QueueFormComponent implements ControlValueAccessor, OnInit, OnDestr
 | 
			
		||||
        }),
 | 
			
		||||
        topic: [''],
 | 
			
		||||
        additionalInfo: this.fb.group({
 | 
			
		||||
          description: ['']
 | 
			
		||||
          description: [''],
 | 
			
		||||
          customProperties: ['']
 | 
			
		||||
        })
 | 
			
		||||
      });
 | 
			
		||||
    this.valueChange$ = this.queueFormGroup.valueChanges.subscribe(() => {
 | 
			
		||||
 | 
			
		||||
@ -121,5 +121,6 @@ export interface QueueInfo extends BaseData<QueueId> {
 | 
			
		||||
  topic: string;
 | 
			
		||||
  additionalInfo: {
 | 
			
		||||
    description?: string;
 | 
			
		||||
    customProperties?: string;
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -3465,6 +3465,8 @@
 | 
			
		||||
        "description": "Description",
 | 
			
		||||
        "description-hint": "This text will be displayed in the Queue description instead of the selected strategy",
 | 
			
		||||
        "alt-description": "Submit Strategy: {{submitStrategy}}, Processing Strategy: {{processingStrategy}}",
 | 
			
		||||
        "custom-properties": "Custom properties",
 | 
			
		||||
        "custom-properties-hint": "Custom queue (topic) creation properties, e.g. 'retention.ms:604800000;retention.bytes:1048576000'",
 | 
			
		||||
        "strategies": {
 | 
			
		||||
            "sequential-by-originator-label": "Sequential by originator",
 | 
			
		||||
            "sequential-by-originator-hint": "New message for e.g. device A is not submitted until previous message for device A is acknowledged",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user