diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java index e39297c878..47a3a60d94 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java @@ -39,19 +39,23 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import java.io.IOException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; @@ -103,9 +107,10 @@ class TbPubSubNodeTest { @ParameterizedTest @MethodSource - public void givenMessageAttributesPatterns_whenOnMsg_thenTellSuccess( + public void givenForceAckIsTrueAndMessageAttributesPatterns_whenOnMsg_thenEnqueueForTellNext( String attributeName, String attributeValue, TbMsgMetaData metaData, String data) { config.setMessageAttributes(Map.of(attributeName, attributeValue)); + ReflectionTestUtils.setField(node, "forceAck", true); init(); String messageId = "2070443601311540"; @@ -115,6 +120,7 @@ class TbPubSubNodeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); node.onMsg(ctxMock, msg); + then(ctxMock).should().ack(msg); PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder(); pubsubMessageBuilder.setData(ByteString.copyFromUtf8(msg.getData())); this.config.getMessageAttributes().forEach((k, v) -> { @@ -124,7 +130,7 @@ class TbPubSubNodeTest { }); then(pubSubClientMock).should().publish(pubsubMessageBuilder.build()); ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); - then(ctxMock).should().tellSuccess(actualMsg.capture()); + then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); metaData.putValue("messageId", messageId); TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); assertThat(actualMsg.getValue()) @@ -133,7 +139,7 @@ class TbPubSubNodeTest { .isEqualTo(expectedMsg); } - private static Stream givenMessageAttributesPatterns_whenOnMsg_thenTellSuccess() { + private static Stream givenForceAckIsTrueAndMessageAttributesPatterns_whenOnMsg_thenEnqueueForTellNext() { return Stream.of( Arguments.of("attributeName", "attributeValue", new TbMsgMetaData(), TbMsg.EMPTY_JSON_OBJECT), Arguments.of("${mdAttrName}", "${mdAttrValue}", new TbMsgMetaData( @@ -147,8 +153,9 @@ class TbPubSubNodeTest { } @Test - public void givenErrorOccursOnTheGCP_whenOnMsg_thenTellFailure() { + public void givenForceAckIsFalseAndErrorOccursOnTheGCP_whenOnMsg_thenTellFailure() { init(); + ReflectionTestUtils.setField(node, "forceAck", false); String errorMsg = "Something went wrong!"; ApiFuture failedFuture = ApiFutures.immediateFailedFuture(new RuntimeException(errorMsg)); @@ -159,6 +166,7 @@ class TbPubSubNodeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT); node.onMsg(ctxMock, msg); + then(ctxMock).should(never()).ack(any()); ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor actualError = ArgumentCaptor.forClass(Throwable.class); then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture()); @@ -171,6 +179,21 @@ class TbPubSubNodeTest { assertThat(actualError.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); } + @Test + public void givenPubSubClientIsNotNull_whenDestroy_thenShutDownAndAwaitTermination() throws InterruptedException { + ReflectionTestUtils.setField(node, "pubSubClient", pubSubClientMock); + node.destroy(); + then(pubSubClientMock).should().shutdown(); + then(pubSubClientMock).should().awaitTermination(1, TimeUnit.SECONDS); + } + + @Test + public void givenPubSubClientIsNull_whenDestroy_thenShutDownAndAwaitTermination() { + ReflectionTestUtils.setField(node, "pubSubClient", null); + node.destroy(); + then(pubSubClientMock).shouldHaveNoInteractions(); + } + private void init() { ReflectionTestUtils.setField(node, "config", config); ReflectionTestUtils.setField(node, "pubSubClient", pubSubClientMock);