diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 6e80a1577b..7d8935f25e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -15,11 +15,14 @@ */ package org.thingsboard.rule.engine.mqtt; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; @@ -35,6 +38,7 @@ import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -81,7 +85,8 @@ public class TbMqttNode extends TbAbstractExternalNode { public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg); var tbMsg = ackIfNeeded(ctx, msg); - this.mqttClient.publish(topic, Unpooled.wrappedBuffer(tbMsg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) + this.mqttClient.publish(topic, Unpooled.wrappedBuffer(getData(tbMsg, mqttNodeConfiguration.isParseToPlainText()).getBytes(UTF8)), + MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) .addListener(future -> { if (future.isSuccess()) { tellSuccess(ctx, tbMsg); @@ -153,4 +158,39 @@ public class TbMqttNode extends TbAbstractExternalNode { return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } + private String getData(TbMsg tbMsg, boolean parseToPlainText) { + if (parseToPlainText) { + return parseJsonStringToPlainText(tbMsg.getData()); + } + return tbMsg.getData(); + } + + protected String parseJsonStringToPlainText(String data) { + if (data.startsWith("\"") && data.endsWith("\"") && data.length() >= 2) { + final String dataBefore = data; + try { + data = JacksonUtil.fromString(data, String.class); + } catch (Exception ignored) {} + log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data); + } + + return data; + } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + String parseToPlainText = "parseToPlainText"; + if (!oldConfiguration.has(parseToPlainText)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(parseToPlainText, false); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java index 5f13b0e677..edf3618631 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java @@ -33,6 +33,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"}}", + true, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(1, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}", + false, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}") + ); + + } + + @Override + protected TbNode getTestNode() { + return node; + } +} \ No newline at end of file