Merge pull request #10349 from irynamatveieva/feature/mqtt-node
MQTT node: added ability to send string without quotes.
This commit is contained in:
commit
9cc0e9da7c
@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
|
|||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.kv.DataType;
|
import org.thingsboard.server.common.data.kv.DataType;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
|
|
||||||
@ -50,6 +51,7 @@ import java.util.regex.Pattern;
|
|||||||
/**
|
/**
|
||||||
* Created by Valerii Sosliuk on 5/12/2017.
|
* Created by Valerii Sosliuk on 5/12/2017.
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public class JacksonUtil {
|
public class JacksonUtil {
|
||||||
|
|
||||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
@ -158,6 +160,20 @@ public class JacksonUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String toPlainText(String data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (data.startsWith("\"") && data.endsWith("\"") && data.length() >= 2) {
|
||||||
|
final String dataBefore = data;
|
||||||
|
try {
|
||||||
|
data = JacksonUtil.fromString(data, String.class);
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T treeToValue(JsonNode node, Class<T> clazz) {
|
public static <T> T treeToValue(JsonNode node, Class<T> clazz) {
|
||||||
try {
|
try {
|
||||||
return OBJECT_MAPPER.treeToValue(node, clazz);
|
return OBJECT_MAPPER.treeToValue(node, clazz);
|
||||||
|
|||||||
@ -19,6 +19,9 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import org.thingsboard.server.common.data.asset.Asset;
|
import org.thingsboard.server.common.data.asset.Asset;
|
||||||
import org.thingsboard.server.common.data.id.AssetId;
|
import org.thingsboard.server.common.data.id.AssetId;
|
||||||
|
|
||||||
@ -55,4 +58,15 @@ public class JacksonUtilTest {
|
|||||||
Assert.assertEquals(asset.getName(), result.getName());
|
Assert.assertEquals(asset.getName(), result.getName());
|
||||||
Assert.assertEquals(asset.getType(), result.getType());
|
Assert.assertEquals(asset.getType(), result.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = { "", "false", "\"", "\"\"", "\"This is a string with double quotes\"", "Path: /home/developer/test.txt",
|
||||||
|
"First line\nSecond line\n\nFourth line", "Before\rAfter", "Tab\tSeparated\tValues", "Test\bbackspace", "[]",
|
||||||
|
"[1, 2, 3]", "{\"key\": \"value\"}", "{\n\"temperature\": 25.5,\n\"humidity\": 50.2\n\"}", "Expression: (a + b) * c",
|
||||||
|
"世界", "Україна", "\u1F1FA\u1F1E6", "🇺🇦"})
|
||||||
|
public void toPlainTextTest(String original) {
|
||||||
|
String serialized = JacksonUtil.toString(original);
|
||||||
|
Assertions.assertNotNull(serialized);
|
||||||
|
Assertions.assertEquals(original, JacksonUtil.toPlainText(serialized));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,11 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.mqtt;
|
package org.thingsboard.rule.engine.mqtt;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.util.concurrent.Promise;
|
import io.netty.util.concurrent.Promise;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.mqtt.MqttClient;
|
import org.thingsboard.mqtt.MqttClient;
|
||||||
import org.thingsboard.mqtt.MqttClientConfig;
|
import org.thingsboard.mqtt.MqttClientConfig;
|
||||||
import org.thingsboard.mqtt.MqttConnectResult;
|
import org.thingsboard.mqtt.MqttConnectResult;
|
||||||
@ -35,6 +38,7 @@ import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
|||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
@ -81,7 +85,8 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
|||||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||||
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
|
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
|
||||||
var tbMsg = ackIfNeeded(ctx, msg);
|
var tbMsg = ackIfNeeded(ctx, msg);
|
||||||
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(tbMsg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
|
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(getData(tbMsg, mqttNodeConfiguration.isParseToPlainText()).getBytes(UTF8)),
|
||||||
|
MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
|
||||||
.addListener(future -> {
|
.addListener(future -> {
|
||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
tellSuccess(ctx, tbMsg);
|
tellSuccess(ctx, tbMsg);
|
||||||
@ -153,4 +158,27 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
|||||||
return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
|
return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getData(TbMsg tbMsg, boolean parseToPlainText) {
|
||||||
|
if (parseToPlainText) {
|
||||||
|
return JacksonUtil.toPlainText(tbMsg.getData());
|
||||||
|
}
|
||||||
|
return tbMsg.getData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||||
|
boolean hasChanges = false;
|
||||||
|
switch (fromVersion) {
|
||||||
|
case 0:
|
||||||
|
String parseToPlainText = "parseToPlainText";
|
||||||
|
if (!oldConfiguration.has(parseToPlainText)) {
|
||||||
|
hasChanges = true;
|
||||||
|
((ObjectNode) oldConfiguration).put(parseToPlainText, false);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return new TbPair<>(hasChanges, oldConfiguration);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,6 +33,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
|
|||||||
|
|
||||||
private boolean cleanSession;
|
private boolean cleanSession;
|
||||||
private boolean ssl;
|
private boolean ssl;
|
||||||
|
private boolean parseToPlainText;
|
||||||
private ClientCredentials credentials;
|
private ClientCredentials credentials;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -44,6 +45,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
|
|||||||
configuration.setCleanSession(true);
|
configuration.setCleanSession(true);
|
||||||
configuration.setSsl(false);
|
configuration.setSsl(false);
|
||||||
configuration.setRetainedMessage(false);
|
configuration.setRetainedMessage(false);
|
||||||
|
configuration.setParseToPlainText(false);
|
||||||
configuration.setCredentials(new AnonymousCredentials());
|
configuration.setCredentials(new AnonymousCredentials());
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -238,23 +238,11 @@ public class TbHttpClient {
|
|||||||
|
|
||||||
private String getData(TbMsg tbMsg, boolean ignoreBody, boolean parseToPlainText) {
|
private String getData(TbMsg tbMsg, boolean ignoreBody, boolean parseToPlainText) {
|
||||||
if (!ignoreBody && parseToPlainText) {
|
if (!ignoreBody && parseToPlainText) {
|
||||||
return parseJsonStringToPlainText(tbMsg.getData());
|
return JacksonUtil.toPlainText(tbMsg.getData());
|
||||||
}
|
}
|
||||||
return tbMsg.getData();
|
return tbMsg.getData();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String parseJsonStringToPlainText(String data) {
|
|
||||||
if (data.startsWith("\"") && data.endsWith("\"") && data.length() >= 2) {
|
|
||||||
final String dataBefore = data;
|
|
||||||
try {
|
|
||||||
data = JacksonUtil.fromString(data, String.class);
|
|
||||||
} catch (Exception ignored) {}
|
|
||||||
log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||||
HttpStatus httpStatus = (HttpStatus) response.getStatusCode();
|
HttpStatus httpStatus = (HttpStatus) response.getStatusCode();
|
||||||
|
|||||||
@ -0,0 +1,60 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.mqtt;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.mockito.Spy;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
@Spy
|
||||||
|
TbMqttNode node;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
node = mock(TbMqttNode.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
// default config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"}}",
|
||||||
|
true,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}"),
|
||||||
|
// default config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(1,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}",
|
||||||
|
false,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}")
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -22,13 +22,10 @@ import org.junit.jupiter.api.AfterEach;
|
|||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockserver.integration.ClientAndServer;
|
import org.mockserver.integration.ClientAndServer;
|
||||||
import org.springframework.util.LinkedMultiValueMap;
|
import org.springframework.util.LinkedMultiValueMap;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
@ -47,7 +44,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.BDDMockito.willCallRealMethod;
|
import static org.mockito.BDDMockito.willCallRealMethod;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -222,16 +218,4 @@ public class TbHttpClientTest {
|
|||||||
Assertions.assertEquals(data.get("Set-Cookie"), "[\"sap-context=sap-client=075; path=/\",\"sap-token=sap-client=075; path=/\"]");
|
Assertions.assertEquals(data.get("Set-Cookie"), "[\"sap-context=sap-client=075; path=/\",\"sap-token=sap-client=075; path=/\"]");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
|
||||||
@ValueSource(strings = { "false", "\"", "\"\"", "\"This is a string with double quotes\"", "Path: /home/developer/test.txt",
|
|
||||||
"First line\nSecond line\n\nFourth line", "Before\rAfter", "Tab\tSeparated\tValues", "Test\bbackspace", "[]",
|
|
||||||
"[1, 2, 3]", "{\"key\": \"value\"}", "{\n\"temperature\": 25.5,\n\"humidity\": 50.2\n\"}", "Expression: (a + b) * c",
|
|
||||||
"世界", "Україна", "\u1F1FA\u1F1E6", "🇺🇦"})
|
|
||||||
public void testParseJsonStringToPlainText(String original) {
|
|
||||||
Mockito.when(client.parseJsonStringToPlainText(anyString())).thenCallRealMethod();
|
|
||||||
|
|
||||||
String serialized = JacksonUtil.toString(original);
|
|
||||||
Assertions.assertNotNull(serialized);
|
|
||||||
Assertions.assertEquals(original, client.parseJsonStringToPlainText(serialized));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user