added ability to send string without quotes
This commit is contained in:
		
							parent
							
								
									439c59e289
								
							
						
					
					
						commit
						3e8f9e3242
					
				@ -15,11 +15,14 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.mqtt;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import io.netty.buffer.Unpooled;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttQoS;
 | 
			
		||||
import io.netty.handler.ssl.SslContext;
 | 
			
		||||
import io.netty.util.concurrent.Promise;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClient;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClientConfig;
 | 
			
		||||
import org.thingsboard.mqtt.MqttConnectResult;
 | 
			
		||||
@ -35,6 +38,7 @@ import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.util.TbPair;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
@ -81,7 +85,8 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
 | 
			
		||||
        var tbMsg = ackIfNeeded(ctx, msg);
 | 
			
		||||
        this.mqttClient.publish(topic, Unpooled.wrappedBuffer(tbMsg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
 | 
			
		||||
        this.mqttClient.publish(topic, Unpooled.wrappedBuffer(getData(tbMsg, mqttNodeConfiguration.isParseToPlainText()).getBytes(UTF8)),
 | 
			
		||||
                        MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
 | 
			
		||||
                .addListener(future -> {
 | 
			
		||||
                            if (future.isSuccess()) {
 | 
			
		||||
                                tellSuccess(ctx, tbMsg);
 | 
			
		||||
@ -153,4 +158,39 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
        return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getData(TbMsg tbMsg, boolean parseToPlainText) {
 | 
			
		||||
        if (parseToPlainText) {
 | 
			
		||||
            return parseJsonStringToPlainText(tbMsg.getData());
 | 
			
		||||
        }
 | 
			
		||||
        return tbMsg.getData();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected String parseJsonStringToPlainText(String data) {
 | 
			
		||||
        if (data.startsWith("\"") && data.endsWith("\"") && data.length() >= 2) {
 | 
			
		||||
            final String dataBefore = data;
 | 
			
		||||
            try {
 | 
			
		||||
                data = JacksonUtil.fromString(data, String.class);
 | 
			
		||||
            } catch (Exception ignored) {}
 | 
			
		||||
            log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return data;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
 | 
			
		||||
        boolean hasChanges = false;
 | 
			
		||||
        switch (fromVersion) {
 | 
			
		||||
            case 0:
 | 
			
		||||
                String parseToPlainText = "parseToPlainText";
 | 
			
		||||
                if (!oldConfiguration.has(parseToPlainText)) {
 | 
			
		||||
                    hasChanges = true;
 | 
			
		||||
                    ((ObjectNode) oldConfiguration).put(parseToPlainText, false);
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            default:
 | 
			
		||||
                break;
 | 
			
		||||
        }
 | 
			
		||||
        return new TbPair<>(hasChanges, oldConfiguration);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -33,6 +33,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
 | 
			
		||||
 | 
			
		||||
    private boolean cleanSession;
 | 
			
		||||
    private boolean ssl;
 | 
			
		||||
    private boolean parseToPlainText;
 | 
			
		||||
    private ClientCredentials credentials;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -44,6 +45,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
 | 
			
		||||
        configuration.setCleanSession(true);
 | 
			
		||||
        configuration.setSsl(false);
 | 
			
		||||
        configuration.setRetainedMessage(false);
 | 
			
		||||
        configuration.setParseToPlainText(false);
 | 
			
		||||
        configuration.setCredentials(new AnonymousCredentials());
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,79 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.mqtt;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.Assertions;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.Arguments;
 | 
			
		||||
import org.junit.jupiter.params.provider.ValueSource;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.mockito.Spy;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyString;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    @Spy
 | 
			
		||||
    TbMqttNode node;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        node = mock(TbMqttNode.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @ValueSource(strings = { "false", "\"", "\"\"", "\"This is a string with double quotes\"", "Path: /home/developer/test.txt",
 | 
			
		||||
            "First line\nSecond line\n\nFourth line", "Before\rAfter", "Tab\tSeparated\tValues", "Test\bbackspace", "[]",
 | 
			
		||||
            "[1, 2, 3]", "{\"key\": \"value\"}", "{\n\"temperature\": 25.5,\n\"humidity\": 50.2\n\"}", "Expression: (a + b) * c",
 | 
			
		||||
            "世界", "Україна", "\u1F1FA\u1F1E6", "🇺🇦"})
 | 
			
		||||
    public void testParseJsonStringToPlainText(String original) {
 | 
			
		||||
        Mockito.when(node.parseJsonStringToPlainText(anyString())).thenCallRealMethod();
 | 
			
		||||
 | 
			
		||||
        String serialized = JacksonUtil.toString(original);
 | 
			
		||||
        Assertions.assertNotNull(serialized);
 | 
			
		||||
        Assertions.assertEquals(original, node.parseJsonStringToPlainText(serialized));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                // default config for version 0
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"}}",
 | 
			
		||||
                        true,
 | 
			
		||||
                        "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}"),
 | 
			
		||||
                // default config for version 1 with upgrade from version 0
 | 
			
		||||
                Arguments.of(1,
 | 
			
		||||
                        "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}",
 | 
			
		||||
                        false,
 | 
			
		||||
                        "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbNode getTestNode() {
 | 
			
		||||
        return node;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user