Merge remote-tracking branch 'origin/master' into feature/push-notifications
This commit is contained in:
commit
66c196cbcb
@ -14,6 +14,16 @@
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
=======
|
||||
-- RULE NODE INDEXES UPDATE START
|
||||
|
||||
DROP INDEX IF EXISTS idx_rule_node_type;
|
||||
DROP INDEX IF EXISTS idx_rule_node_type_configuration_version;
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_node_type_id_configuration_version ON rule_node(type, id, configuration_version);
|
||||
|
||||
-- RULE NODE INDEXES UPDATE END
|
||||
|
||||
|
||||
DO
|
||||
$$
|
||||
BEGIN
|
||||
|
||||
@ -276,6 +276,9 @@ public class ThingsboardInstallService {
|
||||
} else {
|
||||
log.info("Skipping images migration. Run the upgrade with fromVersion as '3.6.2-images' to migrate");
|
||||
}
|
||||
case "3.6.2":
|
||||
log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ...");
|
||||
databaseEntitiesUpgradeService.upgradeDatabase("3.6.2");
|
||||
//TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache
|
||||
case "3.6.2":
|
||||
log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ...");
|
||||
|
||||
@ -1398,6 +1398,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
@ -864,6 +864,8 @@ public class EdgeControllerTest extends AbstractControllerTest {
|
||||
doPost("/api/admin/jwtSettings", settings).andExpect(status().isOk());
|
||||
loginTenantAdmin();
|
||||
|
||||
Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class);
|
||||
|
||||
Asset asset = new Asset();
|
||||
asset.setName("Test Sync Edge Asset 1");
|
||||
asset.setType("test");
|
||||
@ -874,8 +876,6 @@ public class EdgeControllerTest extends AbstractControllerTest {
|
||||
device.setType("default");
|
||||
Device savedDevice = doPost("/api/device", device, Device.class);
|
||||
|
||||
Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class);
|
||||
|
||||
simulateEdgeActivation(edge);
|
||||
|
||||
doPost("/api/edge/" + edge.getId().getId().toString()
|
||||
|
||||
@ -22,9 +22,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.mock.web.MockMultipartFile;
|
||||
import org.springframework.mock.web.MockPart;
|
||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.ImageDescriptor;
|
||||
@ -66,7 +64,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUploadPngImage() throws Exception {
|
||||
String filename = "my_png_image.png";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
|
||||
assertThat(imageInfo.getTitle()).isEqualTo(filename);
|
||||
assertThat(imageInfo.getResourceType()).isEqualTo(ResourceType.IMAGE);
|
||||
@ -85,7 +83,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUploadJpegImage() throws Exception {
|
||||
String filename = "my_jpeg_image.jpg";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE);
|
||||
|
||||
ImageDescriptor imageDescriptor = imageInfo.getDescriptor(ImageDescriptor.class);
|
||||
checkJpegImageDescriptor(imageDescriptor);
|
||||
@ -97,7 +95,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUploadSvgImage() throws Exception {
|
||||
String filename = "my_svg_image.svg";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/svg+xml", SVG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/svg+xml", SVG_IMAGE);
|
||||
|
||||
ImageDescriptor imageDescriptor = imageInfo.getDescriptor(ImageDescriptor.class);
|
||||
checkSvgImageDescriptor(imageDescriptor);
|
||||
@ -109,17 +107,17 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUploadImageWithSameFilename() throws Exception {
|
||||
String filename = "my_jpeg_image.jpg";
|
||||
TbResourceInfo imageInfo1 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE, false);
|
||||
TbResourceInfo imageInfo1 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE);
|
||||
assertThat(imageInfo1.getTitle()).isEqualTo(filename);
|
||||
assertThat(imageInfo1.getFileName()).isEqualTo(filename);
|
||||
assertThat(imageInfo1.getResourceKey()).isEqualTo(filename);
|
||||
|
||||
TbResourceInfo imageInfo2 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE, false);
|
||||
TbResourceInfo imageInfo2 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE);
|
||||
assertThat(imageInfo2.getTitle()).isEqualTo(filename);
|
||||
assertThat(imageInfo2.getFileName()).isEqualTo(filename);
|
||||
assertThat(imageInfo2.getResourceKey()).isEqualTo("my_jpeg_image_(1).jpg");
|
||||
|
||||
TbResourceInfo imageInfo3 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE, false);
|
||||
TbResourceInfo imageInfo3 = uploadImage(HttpMethod.POST, "/api/image", filename, "image/jpeg", JPEG_IMAGE);
|
||||
assertThat(imageInfo3.getTitle()).isEqualTo(filename);
|
||||
assertThat(imageInfo3.getFileName()).isEqualTo(filename);
|
||||
assertThat(imageInfo3.getResourceKey()).isEqualTo("my_jpeg_image_(2).jpg");
|
||||
@ -128,11 +126,11 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUpdateImage() throws Exception {
|
||||
String filename = "my_png_image.png";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
checkPngImageDescriptor(imageInfo.getDescriptor(ImageDescriptor.class));
|
||||
|
||||
String newFilename = "my_jpeg_image.png";
|
||||
imageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE, false);
|
||||
imageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE);
|
||||
|
||||
assertThat(imageInfo.getTitle()).isEqualTo(filename);
|
||||
assertThat(imageInfo.getResourceKey()).isEqualTo(filename);
|
||||
@ -148,7 +146,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUpdateImageInfo() throws Exception {
|
||||
String filename = "my_png_image.png";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
ImageDescriptor imageDescriptor = imageInfo.getDescriptor(ImageDescriptor.class);
|
||||
|
||||
assertThat(imageInfo.getTitle()).isEqualTo(filename);
|
||||
@ -167,7 +165,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testExportImportImage() throws Exception {
|
||||
String filename = "my_png_image.png";
|
||||
uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, false);
|
||||
uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
|
||||
ImageExportData exportData = doGet("/api/images/tenant/" + filename + "/export", ImageExportData.class);
|
||||
assertThat(exportData.getMediaType()).isEqualTo("image/png");
|
||||
@ -175,6 +173,8 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
assertThat(exportData.getTitle()).isEqualTo(filename);
|
||||
assertThat(exportData.getResourceKey()).isEqualTo(filename);
|
||||
assertThat(exportData.getData()).isEqualTo(Base64.getEncoder().encodeToString(PNG_IMAGE));
|
||||
assertThat(exportData.isPublic()).isTrue();
|
||||
assertThat(exportData.getPublicResourceKey()).isNotEmpty();
|
||||
|
||||
doDelete("/api/images/tenant/" + filename).andExpect(status().isOk());
|
||||
|
||||
@ -182,6 +182,8 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
assertThat(importedImageInfo.getTitle()).isEqualTo(filename);
|
||||
assertThat(importedImageInfo.getResourceKey()).isEqualTo(filename);
|
||||
assertThat(importedImageInfo.getFileName()).isEqualTo(filename);
|
||||
assertThat(importedImageInfo.isPublic()).isTrue();
|
||||
assertThat(importedImageInfo.getPublicResourceKey()).isEqualTo(exportData.getPublicResourceKey());
|
||||
checkPngImageDescriptor(importedImageInfo.getDescriptor(ImageDescriptor.class));
|
||||
assertThat(downloadImage("tenant", filename)).containsExactly(PNG_IMAGE);
|
||||
}
|
||||
@ -190,11 +192,11 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
public void testGetImages() throws Exception {
|
||||
loginSysAdmin();
|
||||
String systemImageName = "my_system_png_image.png";
|
||||
TbResourceInfo systemImage = uploadImage(HttpMethod.POST, "/api/image", systemImageName, "image/png", PNG_IMAGE, false);
|
||||
TbResourceInfo systemImage = uploadImage(HttpMethod.POST, "/api/image", systemImageName, "image/png", PNG_IMAGE);
|
||||
|
||||
loginTenantAdmin();
|
||||
String tenantImageName = "my_jpeg_image.jpg";
|
||||
TbResourceInfo tenantImage = uploadImage(HttpMethod.POST, "/api/image", tenantImageName, "image/jpeg", JPEG_IMAGE, false);
|
||||
TbResourceInfo tenantImage = uploadImage(HttpMethod.POST, "/api/image", tenantImageName, "image/jpeg", JPEG_IMAGE);
|
||||
|
||||
List<TbResourceInfo> tenantImages = getImages(null, false, 10);
|
||||
assertThat(tenantImages).containsOnly(tenantImage);
|
||||
@ -211,7 +213,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testUploadPublicImage() throws Exception {
|
||||
String filename = "my_public_image.png";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, true);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
|
||||
assertThat(imageInfo.isPublic()).isTrue();
|
||||
assertThat(imageInfo.getPublicResourceKey()).hasSize(32);
|
||||
@ -225,7 +227,7 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testMakeImagePublic() throws Exception {
|
||||
String filename = "my_public_image.png";
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE, false);
|
||||
TbResourceInfo imageInfo = uploadImage(HttpMethod.POST, "/api/image", filename, "image/png", PNG_IMAGE);
|
||||
String publicKey = imageInfo.getPublicResourceKey();
|
||||
assertThat(publicKey).hasSize(32);
|
||||
|
||||
@ -325,13 +327,9 @@ public class ImageControllerTest extends AbstractControllerTest {
|
||||
.andReturn().getResponse().getContentAsByteArray();
|
||||
}
|
||||
|
||||
private <R> TbResourceInfo uploadImage(HttpMethod httpMethod, String url, String filename, String mediaType, byte[] content, boolean isPublic) throws Exception {
|
||||
private <R> TbResourceInfo uploadImage(HttpMethod httpMethod, String url, String filename, String mediaType, byte[] content) throws Exception {
|
||||
MockMultipartFile file = new MockMultipartFile("file", filename, mediaType, content);
|
||||
MockPart publicPart = new MockPart("isPublic", String.valueOf(isPublic).getBytes());
|
||||
publicPart.getHeaders().setContentType(MediaType.APPLICATION_JSON);
|
||||
var request = MockMvcRequestBuilders.multipart(httpMethod, url)
|
||||
.file(file)
|
||||
.part(publicPart);
|
||||
var request = MockMvcRequestBuilders.multipart(httpMethod, url).file(file);
|
||||
setJwtToken(request);
|
||||
return readResponse(mockMvc.perform(request).andExpect(status().isOk()), TbResourceInfo.class);
|
||||
}
|
||||
|
||||
@ -100,6 +100,7 @@ public interface RuleChainService extends EntityDaoService {
|
||||
|
||||
PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink);
|
||||
|
||||
@Deprecated(forRemoval = true, since = "3.6.3")
|
||||
PageData<RuleNode> findAllRuleNodesByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
PageData<RuleNodeId> findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
@ -16,22 +16,26 @@
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ApiModel
|
||||
@Slf4j
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ImageExportData {
|
||||
|
||||
private final String mediaType;
|
||||
private final String fileName;
|
||||
private final String title;
|
||||
private final String resourceKey;
|
||||
private final boolean isPublic;
|
||||
private final String publicResourceKey;
|
||||
private final String data;
|
||||
private String mediaType;
|
||||
private String fileName;
|
||||
private String title;
|
||||
private String resourceKey;
|
||||
private boolean isPublic;
|
||||
private String publicResourceKey;
|
||||
private String data;
|
||||
|
||||
}
|
||||
|
||||
@ -24,11 +24,17 @@ import com.amazonaws.services.sqs.AmazonSQS;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
|
||||
import com.amazonaws.services.sqs.model.CreateQueueRequest;
|
||||
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.ExecutorProvider;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.util.PropertyUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -38,6 +44,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
|
||||
private final Map<String, String> attributes;
|
||||
private final AmazonSQS sqsClient;
|
||||
private final Map<String, String> queues;
|
||||
@Getter
|
||||
private final ExecutorService producerExecutor;
|
||||
|
||||
public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) {
|
||||
this.attributes = attributes;
|
||||
@ -49,6 +57,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
|
||||
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
|
||||
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
|
||||
}
|
||||
producerExecutor = ThingsBoardExecutors.newWorkStealingPool(sqsSettings.getThreadPoolSize(), "aws-sqs-queue-executor");
|
||||
|
||||
sqsClient = AmazonSQSClientBuilder.standard()
|
||||
.withCredentials(credentialsProvider)
|
||||
@ -104,5 +113,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
|
||||
if (sqsClient != null) {
|
||||
sqsClient.shutdown();
|
||||
}
|
||||
if (producerExecutor != null) {
|
||||
producerExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.services.sqs.AmazonSQS;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
|
||||
import com.amazonaws.handlers.AsyncHandler;
|
||||
import com.amazonaws.services.sqs.AmazonSQSAsync;
|
||||
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
|
||||
import com.amazonaws.services.sqs.model.SendMessageRequest;
|
||||
import com.amazonaws.services.sqs.model.SendMessageResult;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gson.Gson;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
@ -41,19 +37,17 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Slf4j
|
||||
public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
|
||||
private final String defaultTopic;
|
||||
private final AmazonSQS sqsClient;
|
||||
private final AmazonSQSAsync sqsClient;
|
||||
private final Gson gson = new Gson();
|
||||
private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>();
|
||||
private final TbQueueAdmin admin;
|
||||
private ListeningExecutorService producerExecutor;
|
||||
private final TbAwsSqsAdmin admin;
|
||||
|
||||
public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) {
|
||||
this.admin = admin;
|
||||
this.admin = (TbAwsSqsAdmin) admin;
|
||||
this.defaultTopic = defaultTopic;
|
||||
|
||||
AWSCredentialsProvider credentialsProvider;
|
||||
@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
|
||||
}
|
||||
|
||||
sqsClient = AmazonSQSClientBuilder.standard()
|
||||
sqsClient = AmazonSQSAsyncClientBuilder.standard()
|
||||
.withCredentials(credentialsProvider)
|
||||
.withRegion(sqsSettings.getRegion())
|
||||
.withExecutorFactory(this.admin::getProducerExecutor)
|
||||
.build();
|
||||
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
sendMsgRequest.withMessageGroupId(sqsMsgId);
|
||||
sendMsgRequest.withMessageDeduplicationId(sqsMsgId);
|
||||
|
||||
ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
|
||||
|
||||
Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
|
||||
@Override
|
||||
public void onSuccess(SendMessageResult result) {
|
||||
sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
|
||||
@Override public void onError(Exception e) {
|
||||
if (callback != null) {
|
||||
callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata()));
|
||||
callback.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
@Override public void onSuccess(SendMessageRequest request,
|
||||
SendMessageResult sendMessageResult) {
|
||||
if (callback != null) {
|
||||
callback.onFailure(t);
|
||||
callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata()));
|
||||
}
|
||||
}
|
||||
}, producerExecutor);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (producerExecutor != null) {
|
||||
producerExecutor.shutdownNow();
|
||||
}
|
||||
if (sqsClient != null) {
|
||||
sqsClient.shutdown();
|
||||
}
|
||||
|
||||
@ -42,4 +42,7 @@ public class TbAwsSqsSettings {
|
||||
@Value("${queue.aws_sqs.threads_per_topic}")
|
||||
private int threadsPerTopic;
|
||||
|
||||
@Value("${queue.aws_sqs.producer_thread_pool_size:50}")
|
||||
private int threadPoolSize;
|
||||
|
||||
}
|
||||
|
||||
@ -61,7 +61,7 @@ public class Validator {
|
||||
|
||||
|
||||
/**
|
||||
* This method validate <code>long</code> value. If value isn't possitive than throw
|
||||
* This method validate <code>long</code> value. If value isn't positive than throw
|
||||
* <code>IncorrectParameterException</code> exception
|
||||
*
|
||||
* @param val the val
|
||||
|
||||
@ -43,7 +43,7 @@ public interface RuleNodeRepository extends JpaRepository<RuleNodeEntity, UUID>
|
||||
Pageable pageable);
|
||||
|
||||
@Query(nativeQuery = true, value = "SELECT * FROM rule_node r WHERE r.type = :ruleType " +
|
||||
" AND configuration_version < :version " +
|
||||
" AND r.configuration_version < :version " +
|
||||
" AND (:searchText IS NULL OR r.configuration ILIKE CONCAT('%', :searchText, '%'))")
|
||||
Page<RuleNodeEntity> findAllRuleNodesByTypeAndVersionLessThan(@Param("ruleType") String ruleType,
|
||||
@Param("version") int version,
|
||||
|
||||
@ -91,9 +91,7 @@ CREATE INDEX IF NOT EXISTS idx_widgets_bundle_external_id ON widgets_bundle(tena
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_node_external_id ON rule_node(rule_chain_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_node_type ON rule_node(type);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_node_type_configuration_version ON rule_node(type, configuration_version);
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_node_type_id_configuration_version ON rule_node(type, id, configuration_version);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_api_usage_state_entity_id ON api_usage_state(entity_id);
|
||||
|
||||
|
||||
@ -155,6 +155,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -281,6 +281,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
@ -265,6 +265,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
@ -360,6 +360,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
@ -297,6 +297,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
@ -250,6 +250,8 @@ queue:
|
||||
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
|
||||
# Number of threads per each AWS SQS queue in consumer
|
||||
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
|
||||
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
|
||||
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
|
||||
queue-properties:
|
||||
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
|
||||
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user