reduced usage of reflection

This commit is contained in:
IrynaMatveieva 2024-08-02 15:11:28 +03:00
parent da78d93468
commit 9bcfc75a77
2 changed files with 179 additions and 115 deletions

View File

@ -85,7 +85,7 @@ public class TbKafkaNode extends TbAbstractExternalNode {
addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
try { try {
this.producer = new KafkaProducer<>(properties); this.producer = getKafkaProducer(properties);
Thread ioThread = (Thread) ReflectionUtils.getField(IO_THREAD_FIELD, producer); Thread ioThread = (Thread) ReflectionUtils.getField(IO_THREAD_FIELD, producer);
ioThread.setUncaughtExceptionHandler((thread, throwable) -> { ioThread.setUncaughtExceptionHandler((thread, throwable) -> {
if (throwable instanceof ThingsboardKafkaClientError) { if (throwable instanceof ThingsboardKafkaClientError) {
@ -98,6 +98,10 @@ public class TbKafkaNode extends TbAbstractExternalNode {
} }
} }
protected KafkaProducer<String, String> getKafkaProducer(Properties properties) {
return new KafkaProducer<>(properties);
}
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);

View File

@ -16,7 +16,7 @@
package org.thingsboard.rule.engine.kafka; package org.thingsboard.rule.engine.kafka;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
@ -24,6 +24,7 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.KafkaThread;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -31,6 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -40,14 +42,16 @@ import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -56,40 +60,54 @@ import java.util.concurrent.Callable;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.times;
import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow; import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TbKafkaNodeTest { public class TbKafkaNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf"));
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29"));
private final ListeningExecutor executor = new TestDbCallbackExecutor(); private final ListeningExecutor executor = new TestDbCallbackExecutor();
private final long OFFSET = 1;
private final int PARTITION = 0;
private final String TEST_TOPIC = "test-topic";
private final String TEST_KEY = "test-key";
private TbKafkaNode node; private TbKafkaNode node;
private TbKafkaNodeConfiguration config; private TbKafkaNodeConfiguration config;
@Mock @Mock
private TbContext ctxMock; private TbContext ctxMock;
@Mock @Mock
private Producer<String, String> producerMock; private KafkaProducer<String, String> producerMock;
@Mock
private KafkaThread ioThreadMock;
@Mock @Mock
private RecordMetadata recordMetadataMock; private RecordMetadata recordMetadataMock;
@BeforeEach @BeforeEach
void setUp() { public void setUp() {
node = new TbKafkaNode(); node = spy(new TbKafkaNode());
config = new TbKafkaNodeConfiguration().defaultConfiguration(); config = new TbKafkaNodeConfiguration().defaultConfiguration();
config.setTopicPattern(TEST_TOPIC);
config.setKeyPattern(TEST_KEY);
} }
@Test @Test
public void verifyDefaultConfig() { public void verifyDefaultConfig() {
config = new TbKafkaNodeConfiguration().defaultConfiguration();
assertThat(config.getTopicPattern()).isEqualTo("my-topic"); assertThat(config.getTopicPattern()).isEqualTo("my-topic");
assertThat(config.getKeyPattern()).isNull(); assertThat(config.getKeyPattern()).isNull();
assertThat(config.getBootstrapServers()).isEqualTo("localhost:9092"); assertThat(config.getBootstrapServers()).isEqualTo("localhost:9092");
@ -106,27 +124,27 @@ public class TbKafkaNodeTest {
} }
@Test @Test
public void givenAddMetadataKeyValuesAsKafkaHeadersIsTrueAndKafkaHeadersCharsetIsSet_whenInit_thenOk() { public void givenExceptionDuringKafkaInitialization_whenInit_thenDestroy() throws TbNodeException {
config.setAddMetadataKeyValuesAsKafkaHeaders(true); // GIVEN
config.setKafkaHeadersCharset("UTF-16"); given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID);
ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock);
willAnswer(invocationOnMock -> {
Thread.UncaughtExceptionHandler exceptionHandler = invocationOnMock.getArgument(0);
exceptionHandler.uncaughtException(ioThreadMock, new ThingsboardKafkaClientError("Error during init"));
return null;
}).given(ioThreadMock).setUncaughtExceptionHandler(any());
willReturn(producerMock).given(node).getKafkaProducer(any());
String ruleNodeIdStr = "0d35733c-7661-4797-819e-d9188974e3b2"; // WHEN
String serviceIdStr = "test-service"; node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
given(ctxMock.getSelfId()).willReturn(new RuleNodeId(UUID.fromString(ruleNodeIdStr))); // THEN
given(ctxMock.getServiceId()).willReturn(serviceIdStr); then(producerMock).should().close();
then(producerMock).shouldHaveNoMoreInteractions();
assertThatNoException().isThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))));
Boolean addMetadataKeyValuesAsKafkaHeaders = (Boolean) ReflectionTestUtils.getField(node, "addMetadataKeyValuesAsKafkaHeaders");
Charset toBytesCharset = (Charset) ReflectionTestUtils.getField(node, "toBytesCharset");
assertThat(addMetadataKeyValuesAsKafkaHeaders).isTrue();
assertThat(toBytesCharset).isEqualTo(StandardCharsets.UTF_16);
} }
@Test @Test
public void verifyGetKafkaPropertiesMethod() { public void verifyGetKafkaPropertiesMethod() throws TbNodeException {
String sslKeyStoreCertificateChain = "cbdvch\\nfwrg\nvgwg\\n"; String sslKeyStoreCertificateChain = "cbdvch\\nfwrg\nvgwg\\n";
String sslKeyStoreKey = "nghmh\\nhmmnh\\\\ngreg\nvgwg\\n"; String sslKeyStoreKey = "nghmh\\nhmmnh\\\\ngreg\nvgwg\\n";
String sslTruststoreCertificates = "grthrt\fd\\nfwrg\nvgwg\\n"; String sslTruststoreCertificates = "grthrt\fd\\nfwrg\nvgwg\\n";
@ -136,15 +154,17 @@ public class TbKafkaNodeTest {
"ssl.truststore.certificates", sslTruststoreCertificates, "ssl.truststore.certificates", sslTruststoreCertificates,
"ssl.protocol", "TLSv1.2" "ssl.protocol", "TLSv1.2"
)); ));
ReflectionTestUtils.setField(node, "config", config);
String ruleNodeIdStr = "e646b885-8004-45b4-8bfb-78db21870e0f"; ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock);
given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID);
String serviceIdStr = "test-service"; String serviceIdStr = "test-service";
given(ctxMock.getSelfId()).willReturn(new RuleNodeId(UUID.fromString(ruleNodeIdStr)));
given(ctxMock.getServiceId()).willReturn(serviceIdStr); given(ctxMock.getServiceId()).willReturn(serviceIdStr);
willReturn(producerMock).given(node).getKafkaProducer(any());
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
Properties expectedProperties = new Properties(); Properties expectedProperties = new Properties();
expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ruleNodeIdStr + "-" + serviceIdStr); expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + serviceIdStr);
expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
@ -163,14 +183,18 @@ public class TbKafkaNodeTest {
} }
@Test @Test
public void givenInitErrorIsNotNull_whenOnMsg_thenTellFailure() { public void givenInitErrorIsNotNull_whenOnMsg_thenTellFailure() throws TbNodeException {
init(); // GIVEN
String errorMsg = "Error during init!"; mockSuccessfulInit();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String errorMsg = "Error during kafka initialization!";
ReflectionTestUtils.setField(node, "initError", new RuntimeException(errorMsg)); ReflectionTestUtils.setField(node, "initError", new RuntimeException(errorMsg));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class); ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(eq(msg), actualError.capture()); then(ctxMock).should().tellFailure(eq(msg), actualError.capture());
assertThat(actualError.getValue()) assertThat(actualError.getValue())
@ -178,20 +202,24 @@ public class TbKafkaNodeTest {
.hasMessage("Failed to initialize Kafka rule node producer: " + errorMsg); .hasMessage("Failed to initialize Kafka rule node producer: " + errorMsg);
} }
@Test @ParameterizedTest
public void givenForceAckIsTrueAndExceptionWasThrown_whenOnMsg_thenTellFailure() { @ValueSource(booleans = {true, false})
init(); public void givenForceAckAndExceptionWasThrown_whenOnMsg_thenTellFailure(boolean forceAck) throws TbNodeException {
ReflectionTestUtils.setField(node, "forceAck", true); // GIVEN
given(ctxMock.isExternalNodeForceAck()).willReturn(forceAck);
mockSuccessfulInit();
ListeningExecutor executorMock = mock(ListeningExecutor.class); ListeningExecutor executorMock = mock(ListeningExecutor.class);
given(ctxMock.getExternalCallExecutor()).willReturn(executorMock); given(ctxMock.getExternalCallExecutor()).willReturn(executorMock);
String errorMsg = "Something went wrong!"; String errorMsg = "Something went wrong!";
willThrow(new RuntimeException(errorMsg)).given(executorMock).executeAsync(any(Callable.class)); willThrow(new RuntimeException(errorMsg)).given(executorMock).executeAsync(any(Callable.class));
// WHEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
then(ctxMock).should().ack(msg); // THEN
then(ctxMock).should(forceAck ? times(1) : never()).ack(msg);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class); ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture()); then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture());
@ -201,26 +229,33 @@ public class TbKafkaNodeTest {
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
public void givenTopicAndKeyPatternsAndAddMetadataKeyValuesAsKafkaHeadersIsFalse_whenOnMsg_thenTellSuccess public void givenForceAckIsTrueTopicAndKeyPatternsAndAddMetadataKeyValuesAsKafkaHeadersIsFalse_whenOnMsg_thenEnqueueForTellNext(
(String topicPattern, String keyPattern, TbMsgMetaData metaData, String data) { String topicPattern, String keyPattern, TbMsgMetaData metaData, String data
) throws TbNodeException {
// GIVEN
config.setTopicPattern(topicPattern); config.setTopicPattern(topicPattern);
config.setKeyPattern(keyPattern); config.setKeyPattern(keyPattern);
init();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
String topic = TbNodeUtils.processPattern(topicPattern, msg); String topic = TbNodeUtils.processPattern(topicPattern, msg);
String key = TbNodeUtils.processPattern(keyPattern, msg); String key = TbNodeUtils.processPattern(keyPattern, msg);
long offset = 1;
int partition = 0;
mockSuccessfulPublishingRequest(topic, offset, partition);
given(ctxMock.isExternalNodeForceAck()).willReturn(true);
mockSuccessfulInit();
mockSuccessfulPublishingRequest(topic);
// WHEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().ack(msg);
verifyProducerRecord(topic, key, msg.getData()); verifyProducerRecord(topic, key, msg.getData());
verifyOutboundMsg(offset, partition, topic, msg); ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS));
verifyOutgoingSuccessMsg(topic, actualMsg.getValue(), msg);
} }
private static Stream<Arguments> givenTopicAndKeyPatternsAndAddMetadataKeyValuesAsKafkaHeadersIsFalse_whenOnMsg_thenTellSuccess() { private static Stream<Arguments> givenForceAckIsTrueTopicAndKeyPatternsAndAddMetadataKeyValuesAsKafkaHeadersIsFalse_whenOnMsg_thenEnqueueForTellNext() {
return Stream.of( return Stream.of(
Arguments.of("test-topic", "test-key", new TbMsgMetaData(), TbMsg.EMPTY_JSON_OBJECT), Arguments.of("test-topic", "test-key", new TbMsgMetaData(), TbMsg.EMPTY_JSON_OBJECT),
Arguments.of("${mdTopicPattern}", "${mdKeyPattern}", new TbMsgMetaData( Arguments.of("${mdTopicPattern}", "${mdKeyPattern}", new TbMsgMetaData(
@ -233,79 +268,89 @@ public class TbKafkaNodeTest {
); );
} }
@Test
public void givenForceAckIsFalseAndAddMetadataKeyValuesAsKafkaHeadersIsTrueAndToBytesCharsetIsSet_whenOnMsg_thenAckAndTellSuccess() {
String topic = "test-topic";
String key = "test-key";
config.setTopicPattern(topic);
config.setKeyPattern(key);
config.setAddMetadataKeyValuesAsKafkaHeaders(true);
config.setKafkaHeadersCharset("UTF-16");
init();
ReflectionTestUtils.setField(node, "forceAck", false);
ReflectionTestUtils.setField(node, "addMetadataKeyValuesAsKafkaHeaders", true);
ReflectionTestUtils.setField(node, "toBytesCharset", StandardCharsets.UTF_16);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT);
long offset = 1;
int partition = 0;
mockSuccessfulPublishingRequest(topic, offset, partition);
node.onMsg(ctxMock, msg);
then(ctxMock).should(never()).ack(msg);
Headers expectedHeaders = new RecordHeaders();
msg.getMetaData().values().forEach((k, v) -> expectedHeaders.add(new RecordHeader("tb_msg_md_" + k, v.getBytes(StandardCharsets.UTF_16))));
verifyProducerRecord(topic, key, msg.getData(), expectedHeaders);
verifyOutboundMsg(offset, partition, topic, msg);
}
@ParameterizedTest @ParameterizedTest
@NullAndEmptySource @NullAndEmptySource
public void givenKeyIsNullOrEmptyAndErrorOccursDuringPublishing_whenOnMsg_thenTellFailure(String key) { public void givenForceAckIsFalseAndKeyIsNullOrEmptyAndErrorOccursDuringPublishing_whenOnMsg_thenTellFailure(String key) throws TbNodeException {
String topic = "test-topic"; // GIVEN
config.setTopicPattern(topic);
config.setKeyPattern(key); config.setKeyPattern(key);
config.setAddMetadataKeyValuesAsKafkaHeaders(false);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
given(ctxMock.isExternalNodeForceAck()).willReturn(false);
mockSuccessfulInit();
String errorMsg = "Something went wrong!"; String errorMsg = "Something went wrong!";
mockFailedPublishingRequest(new RuntimeException(errorMsg));
given(ctxMock.getExternalCallExecutor()).willReturn(executor); // WHEN
willAnswer(invocation -> { node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
Callback callback = invocation.getArgument(1); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
callback.onCompletion(recordMetadataMock, new RuntimeException(errorMsg));
return null;
}).given(producerMock).send(any(), any(Callback.class));
init();
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
verifyProducerRecord(topic, null, msg.getData()); // THEN
verifyProducerRecord(TEST_TOPIC, null, msg.getData());
then(ctxMock).should(never()).ack(msg);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class); ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture()); then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture());
verifyOutgoingFailureMsg(errorMsg, actualMsg.getValue(), msg);
}
@Test
public void givenForceAckIsTrueAndAddKafkaHeadersIsTrueAndToBytesCharsetIsNullAndErrorOccursDuringPublishing_whenOnMsg_thenEnqueueForTellFailure() throws TbNodeException {
// GIVEN
config.setAddMetadataKeyValuesAsKafkaHeaders(true);
config.setKafkaHeadersCharset(null);
given(ctxMock.isExternalNodeForceAck()).willReturn(true);
mockSuccessfulInit();
String errorMsg = "Something went wrong!";
mockFailedPublishingRequest(new RuntimeException(errorMsg));
// WHEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().ack(msg);
Headers expectedHeaders = new RecordHeaders();
msg.getMetaData().values().forEach((k, v) -> expectedHeaders.add(new RecordHeader("tb_msg_md_" + k, v.getBytes(StandardCharsets.UTF_8))));
verifyProducerRecord(TEST_TOPIC, TEST_KEY, msg.getData(), expectedHeaders);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().enqueueForTellFailure(actualMsg.capture(), actualError.capture());
verifyOutgoingFailureMsg(errorMsg, actualMsg.getValue(), msg);
}
@Test
public void givenForceAckIsFalseAndAddMetadataKeyValuesAsKafkaHeadersIsTrueAndToBytesCharsetIsSet_whenOnMsg_thenTellSuccess() throws TbNodeException {
// GIVEN
config.setAddMetadataKeyValuesAsKafkaHeaders(true);
config.setKafkaHeadersCharset("UTF-16");
given(ctxMock.isExternalNodeForceAck()).willReturn(false);
mockSuccessfulInit();
mockSuccessfulPublishingRequest(TEST_TOPIC);
// WHEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("error", RuntimeException.class + ": " + errorMsg); metaData.putValue("key", "value");
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT);
assertThat(actualMsg.getValue()) node.onMsg(ctxMock, msg);
.usingRecursiveComparison()
.ignoringFields("ctx") // THEN
.isEqualTo(expectedMsg); then(ctxMock).should(never()).ack(msg);
Headers expectedHeaders = new RecordHeaders();
msg.getMetaData().values().forEach((k, v) -> expectedHeaders.add(new RecordHeader("tb_msg_md_" + k, v.getBytes(StandardCharsets.UTF_16))));
verifyProducerRecord(TEST_TOPIC, TEST_KEY, msg.getData(), expectedHeaders);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().tellSuccess(actualMsg.capture());
verifyOutgoingSuccessMsg(TEST_TOPIC, actualMsg.getValue(), msg);
} }
@Test @Test
public void givenProducerIsNotNull_whenDestroy_thenShouldClose() { public void givenProducerIsNotNull_whenDestroy_thenShouldClose() {
ReflectionTestUtils.setField(node, "producer", producerMock); ReflectionTestUtils.setField(node, "producer", producerMock);
node.destroy(); node.destroy();
then(producerMock).should().close(); then(producerMock).should().close();
} }
@ -315,22 +360,31 @@ public class TbKafkaNodeTest {
then(producerMock).shouldHaveNoInteractions(); then(producerMock).shouldHaveNoInteractions();
} }
private void mockSuccessfulPublishingRequest(String topic, long offset, int partition) { private void mockSuccessfulInit() {
ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock);
willReturn(mock(Properties.class)).given(node).getKafkaProperties(ctxMock);
willReturn(producerMock).given(node).getKafkaProducer(any());
}
private void mockSuccessfulPublishingRequest(String topic) {
given(ctxMock.getExternalCallExecutor()).willReturn(executor); given(ctxMock.getExternalCallExecutor()).willReturn(executor);
willAnswer(invocation -> { willAnswer(invocation -> {
Callback callback = invocation.getArgument(1); Callback callback = invocation.getArgument(1);
callback.onCompletion(recordMetadataMock, null); callback.onCompletion(recordMetadataMock, null);
return null; return null;
}).given(producerMock).send(any(), any(Callback.class)); }).given(producerMock).send(any(), any(Callback.class));
given(recordMetadataMock.offset()).willReturn(offset); given(recordMetadataMock.offset()).willReturn(OFFSET);
given(recordMetadataMock.partition()).willReturn(partition); given(recordMetadataMock.partition()).willReturn(PARTITION);
given(recordMetadataMock.topic()).willReturn(topic); given(recordMetadataMock.topic()).willReturn(topic);
} }
private void init() { private void mockFailedPublishingRequest(Exception exception) {
ReflectionTestUtils.setField(node, "config", config); given(ctxMock.getExternalCallExecutor()).willReturn(executor);
ReflectionTestUtils.setField(node, "producer", producerMock); willAnswer(invocation -> {
ReflectionTestUtils.setField(node, "addMetadataKeyValuesAsKafkaHeaders", false); Callback callback = invocation.getArgument(1);
callback.onCompletion(recordMetadataMock, exception);
return null;
}).given(producerMock).send(any(), any(Callback.class));
} }
private void verifyProducerRecord(String expectedTopic, String expectedKey, String expectedValue) { private void verifyProducerRecord(String expectedTopic, String expectedKey, String expectedValue) {
@ -349,17 +403,23 @@ public class TbKafkaNodeTest {
} }
} }
private void verifyOutboundMsg(long expectedOffset, long expectedPartition, String expectedTopic, TbMsg originalMsg) { private void verifyOutgoingSuccessMsg(String expectedTopic, TbMsg actualMsg, TbMsg originalMsg) {
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().tellSuccess(actualMsg.capture());
TbMsgMetaData metaData = originalMsg.getMetaData().copy(); TbMsgMetaData metaData = originalMsg.getMetaData().copy();
metaData.putValue("offset", String.valueOf(expectedOffset)); metaData.putValue("offset", String.valueOf(OFFSET));
metaData.putValue("partition", String.valueOf(expectedPartition)); metaData.putValue("partition", String.valueOf(PARTITION));
metaData.putValue("topic", expectedTopic); metaData.putValue("topic", expectedTopic);
TbMsg expectedMsg = TbMsg.transformMsgMetadata(originalMsg, metaData); TbMsg expectedMsg = TbMsg.transformMsgMetadata(originalMsg, metaData);
assertThat(actualMsg.getValue()) assertThat(actualMsg)
.usingRecursiveComparison() .usingRecursiveComparison()
.ignoringFields("ctx") .ignoringFields("ctx")
.isEqualTo(expectedMsg); .isEqualTo(expectedMsg);
} }
private void verifyOutgoingFailureMsg(String errorMsg, TbMsg actualMsg, TbMsg originalMsg) {
TbMsgMetaData metaData = originalMsg.getMetaData();
metaData.putValue("error", RuntimeException.class + ": " + errorMsg);
TbMsg expectedMsg = TbMsg.transformMsgMetadata(originalMsg, metaData);
assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
} }