added upgrade for the kafka node
This commit is contained in:
		
							parent
							
								
									1d2fe8dbaa
								
							
						
					
					
						commit
						773a206079
					
				@ -15,6 +15,8 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.rule.engine.kafka;
 | 
					package org.thingsboard.rule.engine.kafka;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.fasterxml.jackson.databind.JsonNode;
 | 
				
			||||||
 | 
					import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.apache.commons.lang3.BooleanUtils;
 | 
					import org.apache.commons.lang3.BooleanUtils;
 | 
				
			||||||
import org.apache.kafka.clients.producer.KafkaProducer;
 | 
					import org.apache.kafka.clients.producer.KafkaProducer;
 | 
				
			||||||
@ -36,6 +38,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
				
			|||||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
					import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
				
			||||||
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
 | 
					import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
 | 
				
			||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
					import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.util.TbPair;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
					import org.thingsboard.server.common.msg.TbMsg;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
					import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -49,6 +52,7 @@ import java.util.Properties;
 | 
				
			|||||||
        type = ComponentType.EXTERNAL,
 | 
					        type = ComponentType.EXTERNAL,
 | 
				
			||||||
        name = "kafka",
 | 
					        name = "kafka",
 | 
				
			||||||
        configClazz = TbKafkaNodeConfiguration.class,
 | 
					        configClazz = TbKafkaNodeConfiguration.class,
 | 
				
			||||||
 | 
					        version = 1,
 | 
				
			||||||
        nodeDescription = "Publish messages to Kafka server",
 | 
					        nodeDescription = "Publish messages to Kafka server",
 | 
				
			||||||
        nodeDetails = "Will send record via Kafka producer to Kafka server. " +
 | 
					        nodeDetails = "Will send record via Kafka producer to Kafka server. " +
 | 
				
			||||||
                "Outbound message will contain response fields (<code>offset</code>, <code>partition</code>, <code>topic</code>)" +
 | 
					                "Outbound message will contain response fields (<code>offset</code>, <code>partition</code>, <code>topic</code>)" +
 | 
				
			||||||
@ -201,4 +205,22 @@ public class TbKafkaNode extends TbAbstractExternalNode {
 | 
				
			|||||||
                .build();
 | 
					                .build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
 | 
				
			||||||
 | 
					        boolean hasChanges = false;
 | 
				
			||||||
 | 
					        switch (fromVersion) {
 | 
				
			||||||
 | 
					            case 0 -> {
 | 
				
			||||||
 | 
					                if (oldConfiguration.has("keySerializer") || oldConfiguration.has("valueSerializer")) {
 | 
				
			||||||
 | 
					                    ObjectNode objectConfiguration = (ObjectNode) oldConfiguration;
 | 
				
			||||||
 | 
					                    objectConfiguration.remove("keySerializer");
 | 
				
			||||||
 | 
					                    objectConfiguration.remove("valueSerializer");
 | 
				
			||||||
 | 
					                    hasChanges = true;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            default -> {
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        return new TbPair<>(hasChanges, oldConfiguration);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -39,8 +39,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
 | 
				
			|||||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
					import org.springframework.test.util.ReflectionTestUtils;
 | 
				
			||||||
import org.thingsboard.common.util.JacksonUtil;
 | 
					import org.thingsboard.common.util.JacksonUtil;
 | 
				
			||||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
					import org.thingsboard.common.util.ListeningExecutor;
 | 
				
			||||||
 | 
					import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
 | 
				
			||||||
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
 | 
					import org.thingsboard.rule.engine.TestDbCallbackExecutor;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
					import org.thingsboard.rule.engine.api.TbContext;
 | 
				
			||||||
 | 
					import org.thingsboard.rule.engine.api.TbNode;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
					import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
					import org.thingsboard.rule.engine.api.TbNodeException;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
					import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
				
			||||||
@ -73,7 +75,7 @@ import static org.mockito.BDDMockito.willReturn;
 | 
				
			|||||||
import static org.mockito.BDDMockito.willThrow;
 | 
					import static org.mockito.BDDMockito.willThrow;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ExtendWith(MockitoExtension.class)
 | 
					@ExtendWith(MockitoExtension.class)
 | 
				
			||||||
public class TbKafkaNodeTest {
 | 
					public class TbKafkaNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf"));
 | 
					    private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf"));
 | 
				
			||||||
    private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29"));
 | 
					    private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29"));
 | 
				
			||||||
@ -117,8 +119,6 @@ public class TbKafkaNodeTest {
 | 
				
			|||||||
        assertThat(config.getLinger()).isEqualTo(0);
 | 
					        assertThat(config.getLinger()).isEqualTo(0);
 | 
				
			||||||
        assertThat(config.getBufferMemory()).isEqualTo(33554432);
 | 
					        assertThat(config.getBufferMemory()).isEqualTo(33554432);
 | 
				
			||||||
        assertThat(config.getAcks()).isEqualTo("-1");
 | 
					        assertThat(config.getAcks()).isEqualTo("-1");
 | 
				
			||||||
        assertThat(config.getKeySerializer()).isEqualTo(StringSerializer.class.getName());
 | 
					 | 
				
			||||||
        assertThat(config.getValueSerializer()).isEqualTo(StringSerializer.class.getName());
 | 
					 | 
				
			||||||
        assertThat(config.getOtherProperties()).isEmpty();
 | 
					        assertThat(config.getOtherProperties()).isEmpty();
 | 
				
			||||||
        assertThat(config.isAddMetadataKeyValuesAsKafkaHeaders()).isFalse();
 | 
					        assertThat(config.isAddMetadataKeyValuesAsKafkaHeaders()).isFalse();
 | 
				
			||||||
        assertThat(config.getKafkaHeadersCharset()).isEqualTo("UTF-8");
 | 
					        assertThat(config.getKafkaHeadersCharset()).isEqualTo("UTF-8");
 | 
				
			||||||
@ -163,8 +163,8 @@ public class TbKafkaNodeTest {
 | 
				
			|||||||
        Properties expectedProperties = new Properties();
 | 
					        Properties expectedProperties = new Properties();
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR);
 | 
					        expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR);
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
 | 
					        expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
 | 
					        expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
 | 
					        expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
 | 
					        expectedProperties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
 | 
					        expectedProperties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
 | 
				
			||||||
        expectedProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
 | 
					        expectedProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
 | 
				
			||||||
@ -454,4 +454,75 @@ public class TbKafkaNodeTest {
 | 
				
			|||||||
        assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
 | 
					        assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
 | 
				
			||||||
 | 
					        return Stream.of(
 | 
				
			||||||
 | 
					                //config for version 0
 | 
				
			||||||
 | 
					                Arguments.of(0,
 | 
				
			||||||
 | 
					                        "{\n" +
 | 
				
			||||||
 | 
					                                "  \"topicPattern\": \"test-topic\",\n" +
 | 
				
			||||||
 | 
					                                "  \"keyPattern\": \"test-key\",\n" +
 | 
				
			||||||
 | 
					                                "  \"bootstrapServers\": \"localhost:9092\",\n" +
 | 
				
			||||||
 | 
					                                "  \"retries\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"batchSize\": 16384,\n" +
 | 
				
			||||||
 | 
					                                "  \"linger\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"bufferMemory\": 33554432,\n" +
 | 
				
			||||||
 | 
					                                "  \"acks\": \"-1\",\n" +
 | 
				
			||||||
 | 
					                                "  \"otherProperties\": {},\n" +
 | 
				
			||||||
 | 
					                                "  \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
 | 
				
			||||||
 | 
					                                "  \"kafkaHeadersCharset\": \"UTF-8\",\n" +
 | 
				
			||||||
 | 
					                                "  \"keySerializer\": \"org.apache.kafka.common.serialization.StringSerializer\",\n" +
 | 
				
			||||||
 | 
					                                "  \"valueSerializer\": \"org.apache.kafka.common.serialization.StringSerializer\"\n" +
 | 
				
			||||||
 | 
					                                "}",
 | 
				
			||||||
 | 
					                        true,
 | 
				
			||||||
 | 
					                        "{\n" +
 | 
				
			||||||
 | 
					                                "  \"topicPattern\": \"test-topic\",\n" +
 | 
				
			||||||
 | 
					                                "  \"keyPattern\": \"test-key\",\n" +
 | 
				
			||||||
 | 
					                                "  \"bootstrapServers\": \"localhost:9092\",\n" +
 | 
				
			||||||
 | 
					                                "  \"retries\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"batchSize\": 16384,\n" +
 | 
				
			||||||
 | 
					                                "  \"linger\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"bufferMemory\": 33554432,\n" +
 | 
				
			||||||
 | 
					                                "  \"acks\": \"-1\",\n" +
 | 
				
			||||||
 | 
					                                "  \"otherProperties\": {},\n" +
 | 
				
			||||||
 | 
					                                "  \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
 | 
				
			||||||
 | 
					                                "  \"kafkaHeadersCharset\": \"UTF-8\"\n" +
 | 
				
			||||||
 | 
					                                "}"
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                //config for version 1 with upgrade from version 0
 | 
				
			||||||
 | 
					                Arguments.of(1,
 | 
				
			||||||
 | 
					                        "{\n" +
 | 
				
			||||||
 | 
					                                "  \"topicPattern\": \"test-topic\",\n" +
 | 
				
			||||||
 | 
					                                "  \"keyPattern\": \"test-key\",\n" +
 | 
				
			||||||
 | 
					                                "  \"bootstrapServers\": \"localhost:9092\",\n" +
 | 
				
			||||||
 | 
					                                "  \"retries\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"batchSize\": 16384,\n" +
 | 
				
			||||||
 | 
					                                "  \"linger\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"bufferMemory\": 33554432,\n" +
 | 
				
			||||||
 | 
					                                "  \"acks\": \"-1\",\n" +
 | 
				
			||||||
 | 
					                                "  \"otherProperties\": {},\n" +
 | 
				
			||||||
 | 
					                                "  \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
 | 
				
			||||||
 | 
					                                "  \"kafkaHeadersCharset\": \"UTF-8\"\n" +
 | 
				
			||||||
 | 
					                                "}",
 | 
				
			||||||
 | 
					                        false,
 | 
				
			||||||
 | 
					                        "{\n" +
 | 
				
			||||||
 | 
					                                "  \"topicPattern\": \"test-topic\",\n" +
 | 
				
			||||||
 | 
					                                "  \"keyPattern\": \"test-key\",\n" +
 | 
				
			||||||
 | 
					                                "  \"bootstrapServers\": \"localhost:9092\",\n" +
 | 
				
			||||||
 | 
					                                "  \"retries\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"batchSize\": 16384,\n" +
 | 
				
			||||||
 | 
					                                "  \"linger\": 0,\n" +
 | 
				
			||||||
 | 
					                                "  \"bufferMemory\": 33554432,\n" +
 | 
				
			||||||
 | 
					                                "  \"acks\": \"-1\",\n" +
 | 
				
			||||||
 | 
					                                "  \"otherProperties\": {},\n" +
 | 
				
			||||||
 | 
					                                "  \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
 | 
				
			||||||
 | 
					                                "  \"kafkaHeadersCharset\": \"UTF-8\"\n" +
 | 
				
			||||||
 | 
					                                "}"
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    protected TbNode getTestNode() {
 | 
				
			||||||
 | 
					        return node;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user