From 53eb09bcb939ae28cbefefbbab5cb2dfe3d05434 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 21 Apr 2020 22:31:25 +0300 Subject: [PATCH] Improvements to tests due to new architecture --- .../RuleChainActorMessageProcessor.java | 5 ++- .../server/actors/tenant/TenantActor.java | 40 ++++++++++++------- .../controller/TelemetryController.java | 8 ++-- .../queue/DefaultTbClusterService.java | 6 +++ .../DefaultTbRuleEngineConsumerService.java | 2 +- ...TbRuleEngineProcessingStrategyFactory.java | 16 ++++++-- .../rpc/DefaultTbCoreDeviceRpcService.java | 10 ++--- .../src/main/resources/thingsboard.yml | 2 +- ...tractMqttServerSideRpcIntegrationTest.java | 15 ++++--- .../AbstractMqttTelemetryIntegrationTest.java | 4 +- ...AbstractRuleEngineFlowIntegrationTest.java | 4 +- application/src/test/resources/logback.xml | 2 +- .../server/queue/memory/InMemoryStorage.java | 4 +- .../queue/memory/InMemoryTbQueueConsumer.java | 2 +- .../TbRuleEngineQueueConfiguration.java | 2 +- dao/src/test/resources/sql-test.properties | 10 ++++- 16 files changed, 87 insertions(+), 45 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index a374e590b5..4e3d69c2b7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -249,7 +249,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant(); + if (tenant == null) { + cantFindTenant = true; + log.info("[{}] Started tenant actor for missing tenant.", tenantId); + } else { + // This Service may be started for specific tenant only. + Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant(); - isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); - isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); + isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); + isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); - if (isRuleEngineForCurrentTenant) { - try { - if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) { - initRuleChains(); - } else { - isRuleEngineForCurrentTenant = false; + if (isRuleEngineForCurrentTenant) { + try { + if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) { + log.info("[{}] Going to init rule chains", tenantId); + initRuleChains(); + } else { + isRuleEngineForCurrentTenant = false; + } + } catch (Exception e) { + cantFindTenant = true; } - } catch (Exception e) { - cantFindTenant = true; } + log.info("[{}] Tenant actor started.", tenantId); } - log.info("[{}] Tenant actor started.", tenantId); } catch (Exception e) { log.warn("[{}] Unknown failure", tenantId, e); } @@ -104,7 +111,12 @@ public class TenantActor extends RuleChainManagerActor { @Override protected boolean process(TbActorMsg msg) { if (cantFindTenant) { - log.info("Missing Tenant msg: {}", msg); + log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); + if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { + QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg; + queueMsg.getTbMsg().getCallback().onSuccess(); + } + return true; } switch (msg.getMsgType()) { case PARTITION_CHANGE_MSG: diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 8352e3f457..aee56652c4 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -344,7 +344,7 @@ public class TelemetryController extends BaseController { return deleteAttributes(entityId, scope, keysStr); } - private DeferredResult deleteAttributes(EntityId entityIdStr, String scope, String keysStr) throws ThingsboardException { + private DeferredResult deleteAttributes(EntityId entityIdSrc, String scope, String keysStr) throws ThingsboardException { List keys = toKeysList(keysStr); if (keys.isEmpty()) { return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); @@ -354,13 +354,13 @@ public class TelemetryController extends BaseController { if (DataConstants.SERVER_SCOPE.equals(scope) || DataConstants.SHARED_SCOPE.equals(scope) || DataConstants.CLIENT_SCOPE.equals(scope)) { - return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdStr, (result, tenantId, entityId) -> { + return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> { ListenableFuture> future = attributesService.removeAll(user.getTenantId(), entityId, scope, keys); Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(@Nullable List tmp) { logAttributesDeleted(user, entityId, scope, keys, null); - if (entityId.getEntityType() == EntityType.DEVICE) { + if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) { DeviceId deviceId = new DeviceId(entityId.getId()); Set keysToNotify = new HashSet<>(); keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key))); @@ -397,7 +397,7 @@ public class TelemetryController extends BaseController { @Override public void onSuccess(@Nullable Void tmp) { logAttributesUpdated(user, entityId, scope, attributes, null); - if (entityId.getEntityType() == EntityType.DEVICE) { + if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) { DeviceId deviceId = new DeviceId(entityId.getId()); tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate( user.getTenantId(), deviceId, scope, attributes), null); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index f0054771d9..926b08f38b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -87,6 +87,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); + log.trace("PUSHING msg: {} to:{}", msg, tpi); byte[] msgBytes = encodingService.encode(msg); ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build(); producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); @@ -96,6 +97,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToCore(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); + log.trace("PUSHING msg: {} to:{}", response, tpi); FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() .setRequestIdMSB(response.getId().getMostSignificantBits()) .setRequestIdLSB(response.getId().getLeastSignificantBits()) @@ -108,6 +110,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { + log.trace("PUSHING msg: {} to:{}", msg, tpi); producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); toRuleEngineMsgs.incrementAndGet(); } @@ -123,6 +126,7 @@ public class DefaultTbClusterService implements TbClusterService { } } TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); + log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) @@ -134,6 +138,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); + log.trace("PUSHING msg: {} to:{}", response, tpi); FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() .setRequestIdMSB(response.getId().getMostSignificantBits()) .setRequestIdLSB(response.getId().getLeastSignificantBits()) @@ -147,6 +152,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId); + log.trace("PUSHING msg: {} to:{}", response, tpi); producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback); toTransportNfs.incrementAndGet(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index c8f126b26c..6736518bcc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -174,7 +174,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< })); boolean timeout = false; - if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + if (!ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS)) { timeout = true; } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index b6579b8dcb..bbf283e962 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -17,6 +17,8 @@ package org.thingsboard.server.service.queue.processing; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.settings.TbRuleEngineQueueAckStrategyConfiguration; @@ -32,7 +34,7 @@ public class TbRuleEngineProcessingStrategyFactory { public TbRuleEngineProcessingStrategy newInstance(String name, TbRuleEngineQueueAckStrategyConfiguration configuration) { switch (configuration.getType()) { - case "SKIP_ALL": + case "SKIP_ALL_FAILURES": return new SkipStrategy(name); case "RETRY_ALL": return new RetryStrategy(name, true, true, true, configuration); @@ -98,7 +100,7 @@ public class TbRuleEngineProcessingStrategyFactory { } log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, msg.getValue())); + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { try { @@ -123,7 +125,15 @@ public class TbRuleEngineProcessingStrategyFactory { @Override public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); + if (!result.isSuccess()) { + log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); + } + if (log.isTraceEnabled()) { + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + } + if (log.isTraceEnabled()) { + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + } return new TbRuleEngineProcessingDecision(true, null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 8a1423f97c..f6e5eb8b43 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -106,7 +106,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @Override public void processRpcResponseFromRuleEngine(FromDeviceRpcResponse response) { - log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId()); + log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId(), response); UUID requestId = response.getId(); Consumer consumer = localToRuleEngineRpcRequests.remove(requestId); if (consumer != null) { @@ -177,9 +177,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) { long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); - log.trace("[{}] processing to rule engine request: [{}]", this.hashCode(), requestId); + log.trace("[{}] processing to rule engine request.", requestId); scheduler.schedule(() -> { - log.trace("[{}] timeout for to rule engine request: [{}]", this.hashCode(), requestId); + log.trace("[{}] timeout for processing to rule engine request.", requestId); Consumer consumer = localToRuleEngineRpcRequests.remove(requestId); if (consumer != null) { consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); @@ -189,9 +189,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) { long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); - log.trace("[{}] processing to device request: [{}]", this.hashCode(), requestId); + log.trace("[{}] processing to device request.", requestId); scheduler.schedule(() -> { - log.trace("[{}] timeout for to device request: [{}]", this.hashCode(), requestId); + log.trace("[{}] timeout for to device request.", requestId); localToDeviceRpcRequests.remove(requestId); }, timeout, TimeUnit.MILLISECONDS); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0b2c40acef..3df3cb9559 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -622,7 +622,7 @@ queue: # For BATCH only batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch processing-strategy: - type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 485d618a01..9ddb2c81d9 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -37,6 +37,7 @@ import org.thingsboard.server.service.security.AccessValidator; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -55,6 +56,8 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC private User tenantAdmin; private Long asyncContextTimeoutToUseRpcPlugin; + private static final AtomicInteger atomicInteger = new AtomicInteger(2); + @Before public void beforeTest() throws Exception { @@ -70,7 +73,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC tenantAdmin = new User(); tenantAdmin.setAuthority(Authority.TENANT_ADMIN); tenantAdmin.setTenantId(savedTenant.getId()); - tenantAdmin.setEmail("tenant2@thingsboard.org"); + tenantAdmin.setEmail("tenant" + atomicInteger.getAndIncrement() + "@thingsboard.org"); tenantAdmin.setFirstName("Joe"); tenantAdmin.setLastName("Downs"); @@ -130,7 +133,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String accessToken = deviceCredentials.getCredentialsId(); assertNotNull(accessToken); - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"timeout\": 6000}"; + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), @@ -139,7 +142,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @Test public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception { - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"25\",\"value\": 1}}"; String nonExistentDeviceId = UUIDs.timeBased().toString(); String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, @@ -169,7 +172,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC Thread.sleep(2000); - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); @@ -187,7 +190,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String accessToken = deviceCredentials.getCredentialsId(); assertNotNull(accessToken); - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"timeout\": 6000}"; + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), @@ -196,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @Test public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception { - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"28\",\"value\": 1}}"; String nonExistentDeviceId = UUIDs.timeBased().toString(); String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class, diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java index 7e329d8b05..f96f207834 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java @@ -107,13 +107,13 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr CountDownLatch latch = new CountDownLatch(1); TestMqttCallback callback = new TestMqttCallback(client, latch); client.setCallback(callback); - client.connect(options).waitForCompletion(3000); + client.connect(options).waitForCompletion(5000); client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); String payload = "{\"key\":\"value\"}"; // TODO 3.1: we need to acknowledge subscription only after it is processed by device actor and not when the message is pushed to queue. // MqttClient -> SUB REQUEST -> Transport -> Kafka -> Device Actor (subscribed) // MqttClient <- SUB_ACK <- Transport - Thread.sleep(1000); + Thread.sleep(5000); doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); latch.await(10, TimeUnit.SECONDS); assertEquals(payload, callback.getPayload()); diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 3343d271fe..87a181deae 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -152,7 +152,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); // Pushing Message to the system actorSystem.tell(qMsg, ActorRef.noSender()); - Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); + Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess(); TimePageData eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); List events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); @@ -265,7 +265,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule // Pushing Message to the system actorSystem.tell(qMsg, ActorRef.noSender()); - Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); + Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess(); TimePageData eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000); List events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml index 47dacce343..7943b9d9b1 100644 --- a/application/src/test/resources/logback.xml +++ b/application/src/test/resources/logback.xml @@ -7,7 +7,7 @@ - + diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java index e119d1433b..f51f5b8ae0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java @@ -50,10 +50,10 @@ public final class InMemoryStorage { return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); } - public List get(String topic, long durationInMillis) throws InterruptedException { + public List get(String topic) throws InterruptedException { if (storage.containsKey(topic)) { List entities; - T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS); + T first = (T) storage.get(topic).poll(); if (first != null) { entities = new ArrayList<>(); entities.add(first); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index 3920f143f1..a619eda132 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -68,7 +68,7 @@ public class InMemoryTbQueueConsumer implements TbQueueCon .stream() .map(tpi -> { try { - return storage.get(tpi.getFullTopicName(), durationInMillis); + return storage.get(tpi.getFullTopicName()); } catch (InterruptedException e) { if (!stopped) { log.error("Queue was interrupted.", e); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java index c5a24f20c6..019a76953b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java @@ -24,7 +24,7 @@ public class TbRuleEngineQueueConfiguration { private String topic; private int pollInterval; private int partitions; - private String packProcessingTimeout; + private long packProcessingTimeout; private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy; private TbRuleEngineQueueAckStrategyConfiguration processingStrategy; diff --git a/dao/src/test/resources/sql-test.properties b/dao/src/test/resources/sql-test.properties index 13c0fbc818..7b5f82da2a 100644 --- a/dao/src/test/resources/sql-test.properties +++ b/dao/src/test/resources/sql-test.properties @@ -35,4 +35,12 @@ service.type=monolith #spring.datasource.password=postgres #spring.datasource.url=jdbc:postgresql://localhost:5432/sqltest #spring.datasource.driverClassName=org.postgresql.Driver -#spring.datasource.hikari.maximumPoolSize = 50 \ No newline at end of file +#spring.datasource.hikari.maximumPoolSize = 50 + +queue.rule-engine.queues[0].name=Main +queue.rule-engine.queues[0].topic=tb_rule_engine.main +queue.rule-engine.queues[0].poll-interval=25 +queue.rule-engine.queues[0].partitions=3 +queue.rule-engine.queues[0].pack-processing-timeout=3000 +queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES +queue.rule-engine.queues[0].submit-strategy.type=BURST \ No newline at end of file