Merge branch 'develop/3.5.2' into feature/basic-widget-config

This commit is contained in:
Igor Kulikov 2023-06-24 13:35:59 +03:00
commit 97831351a0
30 changed files with 785 additions and 107 deletions

View File

@ -536,6 +536,10 @@ public class ActorSystemContext {
@Getter
private int maxRpcRetries;
@Value("${actors.rule.external.force_ack:false}")
@Getter
private boolean externalNodeForceAck;
@Getter
@Setter
private TbActorSystem actorSystem;

View File

@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
@ -214,6 +215,12 @@ class DefaultTbContext implements TbContext {
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
}
@Override
public void enqueueForTellFailure(TbMsg tbMsg, Throwable th) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), getFailureMessage(th), null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
@ -311,16 +318,7 @@ class DefaultTbContext implements TbContext {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
}
String failureMessage;
if (th != null) {
if (!StringUtils.isEmpty(th.getMessage())) {
failureMessage = th.getMessage();
} else {
failureMessage = th.getClass().getSimpleName();
}
} else {
failureMessage = null;
}
String failureMessage = getFailureMessage(th);
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
msg, failureMessage));
@ -724,6 +722,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getSlackService();
}
@Override
public boolean isExternalNodeForceAck() {
return mainCtx.isExternalNodeForceAck();
}
@Override
public RuleEngineRpcService getRpcService() {
return mainCtx.getTbRuleEngineDeviceRpcService();
@ -840,12 +843,26 @@ class DefaultTbContext implements TbContext {
}
@Override
public void checkTenantEntity(EntityId entityId) {
public void checkTenantEntity(EntityId entityId) throws TbNodeException {
if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.");
throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true);
}
}
private static String getFailureMessage(Throwable th) {
String failureMessage;
if (th != null) {
if (!StringUtils.isEmpty(th.getMessage())) {
failureMessage = th.getMessage();
} else {
failureMessage = th.getClass().getSimpleName();
}
} else {
failureMessage = null;
}
return failureMessage;
}
private class SimpleTbQueueCallback implements TbQueueCallback {
private final Runnable onSuccess;
private final Consumer<Throwable> onFailure;

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.RuleChainErrorActor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.util.function.Function;
@ -86,7 +88,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
() -> DefaultActorService.RULE_DISPATCHER_NAME,
() -> {
RuleChain ruleChain = provider.apply(ruleChainId);
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
if (ruleChain == null) {
return new RuleChainErrorActor.ActorCreator(systemContext, tenantId,
new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!"));
} else {
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
}
});
}

View File

@ -0,0 +1,78 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.shared;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbStringActorId;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import java.util.UUID;
@Slf4j
public class RuleChainErrorActor extends ContextAwareActor {
private final TenantId tenantId;
private final RuleEngineException error;
private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) {
super(systemContext);
this.tenantId = tenantId;
this.error = error;
}
@Override
protected boolean doProcess(TbActorMsg msg) {
if (msg instanceof RuleChainAwareMsg) {
log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg);
var rcMsg = (RuleChainAwareMsg) msg;
rcMsg.getMsg().getCallback().onFailure(error);
return true;
} else {
return false;
}
}
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
private final RuleEngineException error;
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) {
super(context);
this.tenantId = tenantId;
this.error = error;
}
@Override
public TbActorId createActorId() {
return new TbStringActorId(UUID.randomUUID().toString());
}
@Override
public TbActor createActor() {
return new RuleChainErrorActor(context, tenantId, error);
}
}
}

View File

@ -393,6 +393,10 @@ actors:
queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
# Time in milliseconds for transaction to complete
duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
external:
# Force acknowledgement of the incoming message for external rule nodes to decrease processing latency.
# Enqueue the result of external node processing as a separate message to the rule engine.
force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}"
rpc:
max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
sequential: "${ACTORS_RPC_SEQUENTIAL:false}"

View File

@ -19,6 +19,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorError;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
@ -69,9 +70,14 @@ public final class TbActorMailbox implements TbActorCtx {
}
}
} catch (Throwable t) {
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
InitFailureStrategy strategy;
int attemptIdx = attempt + 1;
InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
if (isUnrecoverable(t)) {
strategy = InitFailureStrategy.stop();
} else {
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
strategy = actor.onInitFailure(attempt, t);
}
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
stopReason = TbActorStopReason.INIT_FAILED;
@ -88,6 +94,13 @@ public final class TbActorMailbox implements TbActorCtx {
}
}
private static boolean isUnrecoverable(Throwable t) {
if (t instanceof TbActorException && t.getCause() != null) {
t = t.getCause();
}
return t instanceof TbActorError && ((TbActorError) t).isUnrecoverable();
}
private void enqueue(TbActorMsg msg, boolean highPriority) {
if (!destroyInProgress.get()) {
if (highPriority) {

View File

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
public interface TbActorError {
boolean isUnrecoverable();
}

View File

@ -277,6 +277,11 @@ public final class TbMsg implements Serializable {
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback);
}
public TbMsg copyWithNewCtx() {
return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY);
}
public TbMsgCallback getCallback() {
// May be null in case of deserialization;
if (callback != null) {

View File

@ -17,9 +17,12 @@ package org.thingsboard.server.common.msg.aware;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
public interface RuleChainAwareMsg extends TbActorMsg {
RuleChainId getRuleChainId();
TbMsg getMsg();
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.script.api.tbel;
import com.google.common.primitives.Bytes;
import org.apache.commons.lang3.ArrayUtils;
import org.mvel2.ExecutionContext;
import org.mvel2.ParserConfiguration;
import org.mvel2.execution.ExecutionArrayList;
@ -25,11 +27,13 @@ import org.thingsboard.server.common.data.StringUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
@ -61,6 +65,10 @@ public class TbUtils {
String.class)));
parserConfig.addImport("parseInt", new MethodStub(TbUtils.class.getMethod("parseInt",
String.class, int.class)));
parserConfig.addImport("parseLong", new MethodStub(TbUtils.class.getMethod("parseLong",
String.class)));
parserConfig.addImport("parseLong", new MethodStub(TbUtils.class.getMethod("parseLong",
String.class, int.class)));
parserConfig.addImport("parseFloat", new MethodStub(TbUtils.class.getMethod("parseFloat",
String.class)));
parserConfig.addImport("parseDouble", new MethodStub(TbUtils.class.getMethod("parseDouble",
@ -81,8 +89,58 @@ public class TbUtils {
byte[].class, int.class, int.class)));
parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt",
byte[].class, int.class, int.class, boolean.class)));
parserConfig.addImport("parseLittleEndianHexToLong", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToLong",
String.class)));
parserConfig.addImport("parseBigEndianHexToLong", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToLong",
String.class)));
parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong",
String.class)));
parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong",
String.class, boolean.class)));
parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong",
List.class, int.class, int.class)));
parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong",
List.class, int.class, int.class, boolean.class)));
parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong",
byte[].class, int.class, int.class)));
parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong",
byte[].class, int.class, int.class, boolean.class)));
parserConfig.addImport("parseLittleEndianHexToFloat", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToFloat",
String.class)));
parserConfig.addImport("parseBigEndianHexToFloat", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToFloat",
String.class)));
parserConfig.addImport("parseHexToFloat", new MethodStub(TbUtils.class.getMethod("parseHexToFloat",
String.class)));
parserConfig.addImport("parseHexToFloat", new MethodStub(TbUtils.class.getMethod("parseHexToFloat",
String.class, boolean.class)));
parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat",
byte[].class, int.class, boolean.class)));
parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat",
byte[].class, int.class)));
parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat",
List.class, int.class, boolean.class)));
parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat",
List.class, int.class)));
parserConfig.addImport("parseLittleEndianHexToDouble", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToDouble",
String.class)));
parserConfig.addImport("parseBigEndianHexToDouble", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToDouble",
String.class)));
parserConfig.addImport("parseHexToDouble", new MethodStub(TbUtils.class.getMethod("parseHexToDouble",
String.class)));
parserConfig.addImport("parseHexToDouble", new MethodStub(TbUtils.class.getMethod("parseHexToDouble",
String.class, boolean.class)));
parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble",
byte[].class, int.class)));
parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble",
byte[].class, int.class, boolean.class)));
parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble",
List.class, int.class)));
parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble",
List.class, int.class, boolean.class)));
parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed",
double.class, int.class)));
parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed",
float.class, int.class)));
parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes",
ExecutionContext.class, String.class)));
parserConfig.addImport("base64ToHex", new MethodStub(TbUtils.class.getMethod("base64ToHex",
@ -154,37 +212,73 @@ public class TbUtils {
}
public static Integer parseInt(String value) {
if (value != null) {
int radix = getRadix(value);
return parseInt(value, radix);
}
public static Integer parseInt(String value, int radix) {
if (StringUtils.isNotBlank(value)) {
try {
int radix = 10;
if (isHexadecimal(value)) {
radix = 16;
String valueP = prepareNumberString(value);
isValidRadix(valueP, radix);
try {
return Integer.parseInt(valueP, radix);
} catch (NumberFormatException e) {
BigInteger bi = new BigInteger(valueP, radix);
if (bi.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0)
throw new NumberFormatException("Value \"" + value + "\" is greater than the maximum Integer value " + Integer.MAX_VALUE + " !");
if (bi.compareTo(BigInteger.valueOf(Integer.MIN_VALUE)) < 0)
throw new NumberFormatException("Value \"" + value + "\" is less than the minimum Integer value " + Integer.MIN_VALUE + " !");
Float f = parseFloat(valueP);
if (f != null) {
return f.intValue();
} else {
throw new NumberFormatException(e.getMessage());
}
}
return Integer.parseInt(prepareNumberString(value), radix);
} catch (NumberFormatException e) {
Float f = parseFloat(value);
if (f != null) {
return f.intValue();
}
throw new NumberFormatException(e.getMessage());
}
}
return null;
}
public static Integer parseInt(String value, int radix) {
if (value != null) {
public static Long parseLong(String value) {
int radix = getRadix(value);
return parseLong(value, radix);
}
public static Long parseLong(String value, int radix) {
if (StringUtils.isNotBlank(value)) {
try {
return Integer.parseInt(prepareNumberString(value), radix);
} catch (NumberFormatException e) {
Float f = parseFloat(value);
if (f != null) {
return f.intValue();
String valueP = prepareNumberString(value);
isValidRadix(valueP, radix);
try {
return Long.parseLong(valueP, radix);
} catch (NumberFormatException e) {
BigInteger bi = new BigInteger(valueP, radix);
if (bi.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0)
throw new NumberFormatException("Value \"" + value + "\"is greater than the maximum Long value " + Long.MAX_VALUE + " !");
if (bi.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0)
throw new NumberFormatException("Value \"" + value + "\" is less than the minimum Long value " + Long.MIN_VALUE + " !");
Double dd = parseDouble(valueP);
if (dd != null) {
return dd.longValue();
} else {
throw new NumberFormatException(e.getMessage());
}
}
} catch (NumberFormatException e) {
throw new NumberFormatException(e.getMessage());
}
}
return null;
}
private static int getRadix(String value, int... radixS) {
return radixS.length > 0 ? radixS[0] : isHexadecimal(value) ? 16 : 10;
}
public static Float parseFloat(String value) {
if (value != null) {
try {
@ -218,8 +312,64 @@ public class TbUtils {
}
public static int parseHexToInt(String hex, boolean bigEndian) {
byte[] data = prepareHexToBytesNumber(hex, 8);
return parseBytesToInt(data, 0, data.length, bigEndian);
}
public static long parseLittleEndianHexToLong(String hex) {
return parseHexToLong(hex, false);
}
public static long parseBigEndianHexToLong(String hex) {
return parseHexToLong(hex, true);
}
public static long parseHexToLong(String hex) {
return parseHexToLong(hex, true);
}
public static long parseHexToLong(String hex, boolean bigEndian) {
byte[] data = prepareHexToBytesNumber(hex, 16);
return parseBytesToLong(data, 0, data.length, bigEndian);
}
public static float parseLittleEndianHexToFloat(String hex) {
return parseHexToFloat(hex, false);
}
public static float parseBigEndianHexToFloat(String hex) {
return parseHexToFloat(hex, true);
}
public static float parseHexToFloat(String hex) {
return parseHexToFloat(hex, true);
}
public static float parseHexToFloat(String hex, boolean bigEndian) {
byte[] data = prepareHexToBytesNumber(hex, 8);
return parseBytesToFloat(data, 0, bigEndian);
}
public static double parseLittleEndianHexToDouble(String hex) {
return parseHexToDouble(hex, false);
}
public static double parseBigEndianHexToDouble(String hex) {
return parseHexToDouble(hex, true);
}
public static double parseHexToDouble(String hex) {
return parseHexToDouble(hex, true);
}
public static double parseHexToDouble(String hex, boolean bigEndian) {
byte[] data = prepareHexToBytesNumber(hex, 16);
return parseBytesToDouble(data, 0, bigEndian);
}
private static byte[] prepareHexToBytesNumber(String hex, int len) {
int length = hex.length();
if (length > 8) {
if (length > len) {
throw new IllegalArgumentException("Hex string is too large. Maximum 8 symbols allowed.");
}
if (length % 2 > 0) {
@ -229,7 +379,7 @@ public class TbUtils {
for (int i = 0; i < length; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16));
}
return parseBytesToInt(data, 0, data.length, bigEndian);
return data;
}
public static ExecutionArrayList<Byte> hexToBytes(ExecutionContext ctx, String hex) {
@ -293,6 +443,91 @@ public class TbUtils {
return bb.getInt();
}
public static long parseBytesToLong(List<Byte> data, int offset, int length) {
return parseBytesToLong(data, offset, length, true);
}
public static long parseBytesToLong(List<Byte> data, int offset, int length, boolean bigEndian) {
final byte[] bytes = new byte[data.size()];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = data.get(i);
}
return parseBytesToLong(bytes, offset, length, bigEndian);
}
public static long parseBytesToLong(byte[] data, int offset, int length) {
return parseBytesToLong(data, offset, length, true);
}
public static long parseBytesToLong(byte[] data, int offset, int length, boolean bigEndian) {
if (offset > data.length) {
throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!");
}
if (length > 8) {
throw new IllegalArgumentException("Length: " + length + " is too large. Maximum 4 bytes is allowed!");
}
if (offset + length > data.length) {
throw new IllegalArgumentException("Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!");
}
var bb = ByteBuffer.allocate(8);
if (!bigEndian) {
bb.order(ByteOrder.LITTLE_ENDIAN);
}
bb.position(bigEndian ? 8 - length : 0);
bb.put(data, offset, length);
bb.position(0);
return bb.getLong();
}
public static float parseBytesToFloat(byte[] data, int offset) {
return parseBytesToFloat(data, offset, true);
}
public static float parseBytesToFloat(List data, int offset) {
return parseBytesToFloat(data, offset, true);
}
public static float parseBytesToFloat(List data, int offset, boolean bigEndian) {
return parseBytesToFloat(Bytes.toArray(data), offset, bigEndian);
}
public static float parseBytesToFloat(byte[] data, int offset, boolean bigEndian) {
byte[] bytesToNumber = prepareBytesToNumber(data, offset, 4, bigEndian);
return ByteBuffer.wrap(bytesToNumber).getFloat();
}
public static double parseBytesToDouble(byte[] data, int offset) {
return parseBytesToDouble(data, offset, true);
}
public static double parseBytesToDouble(List data, int offset) {
return parseBytesToDouble(data, offset, true);
}
public static double parseBytesToDouble(List data, int offset, boolean bigEndian) {
return parseBytesToDouble(Bytes.toArray(data), offset, bigEndian);
}
public static double parseBytesToDouble(byte[] data, int offset, boolean bigEndian) {
byte[] bytesToNumber = prepareBytesToNumber(data, offset, 8, bigEndian);
return ByteBuffer.wrap(bytesToNumber).getDouble();
}
private static byte[] prepareBytesToNumber(byte[] data, int offset, int length, boolean bigEndian) {
if (offset > data.length) {
throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!");
}
if ((offset + length) > data.length) {
throw new IllegalArgumentException("Default length is always " + length + " bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!");
}
byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset + length));
if (!bigEndian) {
ArrayUtils.reverse(dataBytesArray);
}
return dataBytesArray;
}
public static String bytesToHex(ExecutionArrayList<?> bytesList) {
byte[] bytes = new byte[bytesList.size()];
for (int i = 0; i < bytesList.size(); i++) {
@ -315,6 +550,10 @@ public class TbUtils {
return BigDecimal.valueOf(value).setScale(precision, RoundingMode.HALF_UP).doubleValue();
}
public static float toFixed(float value, int precision) {
return BigDecimal.valueOf(value).setScale(precision, RoundingMode.HALF_UP).floatValue();
}
private static boolean isHexadecimal(String value) {
return value != null && (value.contains("0x") || value.contains("0X"));
}
@ -388,4 +627,19 @@ public class TbUtils {
}
}
}
public static boolean isValidRadix(String value, int radix) {
for (int i = 0; i < value.length(); i++) {
if (i == 0 && value.charAt(i) == '-') {
if (value.length() == 1)
throw new NumberFormatException("Failed radix [" + radix + "] for value: \"" + value + "\"!");
else
continue;
}
if (Character.digit(value.charAt(i), radix) < 0)
throw new NumberFormatException("Failed radix: [" + radix + "] for value: \"" + value + "\"!");
}
return true;
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.script.api.tbel;
import com.google.common.primitives.Bytes;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Assert;
@ -26,6 +27,7 @@ import org.mvel2.SandboxedParserConfiguration;
import org.mvel2.execution.ExecutionArrayList;
import org.mvel2.execution.ExecutionHashMap;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
@ -38,6 +40,23 @@ public class TbUtilsTest {
private ExecutionContext ctx;
private final String intValHex = "41EA62CC";
private final float floatVal = 29.29824f;
private final String floatValStr = "29.29824";
private final String floatValHexRev = "CC62EA41";
private final float floatValRev = -5.948442E7f;
private final long longVal = 0x409B04B10CB295EAL;
private final String longValHex = "409B04B10CB295EA";
private final long longValRev = 0xEA95B20CB1049B40L;
private final String longValHexRev = "EA95B20CB1049B40";
private final String doubleValStr = "1729.1729";
private final double doubleVal = 1729.1729;
private final double doubleValRev = -2.7208640774822924E205;
@Before
public void before() {
SandboxedParserConfiguration parserConfig = ParserContext.enableSandboxedMode();
@ -62,6 +81,7 @@ public class TbUtilsTest {
@Test
public void parseHexToInt() {
Assert.assertEquals(0xAB, TbUtils.parseHexToInt("AB"));
Assert.assertEquals(0xABBA, TbUtils.parseHexToInt("ABBA", true));
Assert.assertEquals(0xBAAB, TbUtils.parseHexToInt("ABBA", false));
Assert.assertEquals(0xAABBCC, TbUtils.parseHexToInt("AABBCC", true));
@ -182,9 +202,150 @@ public class TbUtilsTest {
Assert.assertEquals(expectedMapWithoutPaths, actualMapWithoutPaths);
}
@Test
public void parseInt() {
Assert.assertNull(TbUtils.parseInt(null));
Assert.assertNull(TbUtils.parseInt(""));
Assert.assertNull(TbUtils.parseInt(" "));
private static String keyToValue(String key, String extraSymbol) {
return key + "Value" + (extraSymbol == null ? "" : extraSymbol);
Assert.assertEquals(java.util.Optional.of(0).get(), TbUtils.parseInt("0"));
Assert.assertEquals(java.util.Optional.of(0).get(), TbUtils.parseInt("-0"));
Assert.assertEquals(java.util.Optional.of(473).get(), TbUtils.parseInt("473"));
Assert.assertEquals(java.util.Optional.of(-255).get(), TbUtils.parseInt("-0xFF"));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FF"));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("0xFG"));
Assert.assertEquals(java.util.Optional.of(102).get(), TbUtils.parseInt("1100110", 2));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("1100210", 2));
Assert.assertEquals(java.util.Optional.of(63).get(), TbUtils.parseInt("77", 8));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("18", 8));
Assert.assertEquals(java.util.Optional.of(-255).get(), TbUtils.parseInt("-FF", 16));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FG", 16));
Assert.assertEquals(java.util.Optional.of(Integer.MAX_VALUE).get(), TbUtils.parseInt(Integer.toString(Integer.MAX_VALUE), 10));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt(BigInteger.valueOf(Integer.MAX_VALUE).add(BigInteger.valueOf(1)).toString(10), 10));
Assert.assertEquals(java.util.Optional.of(Integer.MIN_VALUE).get(), TbUtils.parseInt(Integer.toString(Integer.MIN_VALUE), 10));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt(BigInteger.valueOf(Integer.MIN_VALUE).subtract(BigInteger.valueOf(1)).toString(10), 10));
Assert.assertEquals(java.util.Optional.of(506070563).get(), TbUtils.parseInt("KonaIn", 30));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("KonaIn", 10));
}
@Test
public void parseFloat() {
Assert.assertEquals(java.util.Optional.of(floatVal).get(), TbUtils.parseFloat(floatValStr));
}
@Test
public void toFixedFloat() {
float actualF = TbUtils.toFixed(floatVal, 3);
Assert.assertEquals(1, Float.compare(floatVal, actualF));
Assert.assertEquals(0, Float.compare(29.298f, actualF));
}
@Test
public void parseHexToFloat() {
Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseHexToFloat(intValHex)));
Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseHexToFloat(intValHex, false)));
Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBigEndianHexToFloat(intValHex)));
Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseLittleEndianHexToFloat(floatValHexRev)));
}
@Test
public void arseBytesToFloat() {
byte[] floatValByte = {65, -22, 98, -52};
Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBytesToFloat(floatValByte, 0)));
Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseBytesToFloat(floatValByte, 0, false)));
List <Byte> floatVaList = Bytes.asList(floatValByte);
Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBytesToFloat(floatVaList, 0)));
Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseBytesToFloat(floatVaList, 0, false)));
}
@Test
public void parseLong() {
Assert.assertNull(TbUtils.parseLong(null));
Assert.assertNull(TbUtils.parseLong(""));
Assert.assertNull(TbUtils.parseLong(" "));
Assert.assertEquals(java.util.Optional.of(0L).get(), TbUtils.parseLong("0"));
Assert.assertEquals(java.util.Optional.of(0L).get(), TbUtils.parseLong("-0"));
Assert.assertEquals(java.util.Optional.of(473L).get(), TbUtils.parseLong("473"));
Assert.assertEquals(java.util.Optional.of(-65535L).get(), TbUtils.parseLong("-0xFFFF"));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("FFFFFFFF"));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("0xFGFFFFFF"));
Assert.assertEquals(java.util.Optional.of(13158L).get(), TbUtils.parseLong("11001101100110", 2));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("11001101100210", 2));
Assert.assertEquals(java.util.Optional.of(9223372036854775807L).get(), TbUtils.parseLong("777777777777777777777", 8));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("1787", 8));
Assert.assertEquals(java.util.Optional.of(-255L).get(), TbUtils.parseLong("-FF", 16));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("FG", 16));
Assert.assertEquals(java.util.Optional.of(Long.MAX_VALUE).get(), TbUtils.parseLong(Long.toString(Long.MAX_VALUE), 10));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong(BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.valueOf(1)).toString(10), 10));
Assert.assertEquals(java.util.Optional.of(Long.MIN_VALUE).get(), TbUtils.parseLong(Long.toString(Long.MIN_VALUE), 10));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong(BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.valueOf(1)).toString(10), 10));
Assert.assertEquals(java.util.Optional.of(218840926543L).get(), TbUtils.parseLong("KonaLong", 27));
Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("KonaLong", 10));
}
@Test
public void parseHexToLong() {
Assert.assertEquals(longVal, TbUtils.parseHexToLong(longValHex));
Assert.assertEquals(longVal, TbUtils.parseHexToLong(longValHexRev, false));
Assert.assertEquals(longVal, TbUtils.parseBigEndianHexToLong(longValHex));
Assert.assertEquals(longVal, TbUtils.parseLittleEndianHexToLong(longValHexRev));
}
@Test
public void parseBytesToLong() {
byte[] longValByte = {64, -101, 4, -79, 12, -78, -107, -22};
Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longValByte, 0, 8));
Bytes.reverse(longValByte);
Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longValByte, 0, 8, false));
List <Byte> longVaList = Bytes.asList(longValByte);
Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longVaList, 0, 8, false));
Assert.assertEquals(longValRev, TbUtils.parseBytesToLong(longVaList, 0, 8));
}
@Test
public void parsDouble() {
Assert.assertEquals(java.util.Optional.of(doubleVal).get(), TbUtils.parseDouble(doubleValStr));
}
@Test
public void toFixedDouble() {
double actualD = TbUtils.toFixed(doubleVal, 3);
Assert.assertEquals(-1, Double.compare(doubleVal, actualD));
Assert.assertEquals(0, Double.compare(1729.173, actualD));
}
@Test
public void parseHexToDouble() {
Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseHexToDouble(longValHex)));
Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseHexToDouble(longValHex, false)));
Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBigEndianHexToDouble(longValHex)));
Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseLittleEndianHexToDouble(longValHexRev)));
}
@Test
public void parseBytesToDouble() {
byte[] doubleValByte = {64, -101, 4, -79, 12, -78, -107, -22};
Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBytesToDouble(doubleValByte, 0)));
Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseBytesToDouble(doubleValByte, 0, false)));
List <Byte> doubleVaList = Bytes.asList(doubleValByte);
Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBytesToDouble(doubleVaList, 0)));
Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseBytesToDouble(doubleVaList, 0, false)));
}
private static List<Byte> toList(byte[] data) {
@ -194,5 +355,5 @@ public class TbUtilsTest {
}
return result;
}
}

View File

@ -167,6 +167,8 @@ public interface TbContext {
void enqueueForTellFailure(TbMsg msg, String failureMessage);
void enqueueForTellFailure(TbMsg tbMsg, Throwable t);
void enqueueForTellNext(TbMsg msg, String relationType);
void enqueueForTellNext(TbMsg msg, Set<String> relationTypes);
@ -210,7 +212,7 @@ public interface TbContext {
void schedule(Runnable runnable, long delay, TimeUnit timeUnit);
void checkTenantEntity(EntityId entityId);
void checkTenantEntity(EntityId entityId) throws TbNodeException;
boolean isLocalEntity(EntityId entityId);
@ -302,6 +304,8 @@ public interface TbContext {
SlackService getSlackService();
boolean isExternalNodeForceAck();
/**
* Creates JS Script Engine
* @deprecated

View File

@ -15,17 +15,33 @@
*/
package org.thingsboard.rule.engine.api;
import lombok.Getter;
import org.thingsboard.server.common.msg.TbActorError;
/**
* Created by ashvayka on 19.01.18.
*/
public class TbNodeException extends Exception {
public class TbNodeException extends Exception implements TbActorError {
@Getter
private final boolean unrecoverable;
public TbNodeException(String message) {
this(message, false);
}
public TbNodeException(String message, boolean unrecoverable) {
super(message);
this.unrecoverable = unrecoverable;
}
public TbNodeException(Exception e) {
this(e, false);
}
public TbNodeException(Exception e, boolean unrecoverable) {
super(e);
this.unrecoverable = unrecoverable;
}
}

View File

@ -44,7 +44,7 @@ public class TbNodeUtils {
try {
return JacksonUtil.treeToValue(configuration.getData(), clazz);
} catch (IllegalArgumentException e) {
throw new TbNodeException(e);
throw new TbNodeException(e, true);
}
}

View File

@ -66,7 +66,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
if (!this.config.isDynamicSeverity()) {
this.notDynamicAlarmSeverity = EnumUtils.getEnum(AlarmSeverity.class, this.config.getSeverity());
if (this.notDynamicAlarmSeverity == null) {
throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity());
throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity(), true);
}
}
}

View File

@ -26,10 +26,11 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -51,7 +52,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
configDirective = "tbExternalNodeSnsConfig",
iconUrl = ""
)
public class TbSnsNode implements TbNode {
public class TbSnsNode extends TbAbstractExternalNode {
private static final String MESSAGE_ID = "messageId";
private static final String REQUEST_ID = "requestId";
@ -62,6 +63,7 @@ public class TbSnsNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbSnsNodeConfiguration.class);
AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey());
AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
@ -78,8 +80,9 @@ public class TbSnsNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
withCallback(publishMessageAsync(ctx, msg),
ctx::tellSuccess,
t -> ctx.tellFailure(processException(ctx, msg, t), t));
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -55,7 +56,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
configDirective = "tbExternalNodeSqsConfig",
iconUrl = ""
)
public class TbSqsNode implements TbNode {
public class TbSqsNode extends TbAbstractExternalNode {
private static final String MESSAGE_ID = "messageId";
private static final String REQUEST_ID = "requestId";
@ -69,6 +70,7 @@ public class TbSqsNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbSqsNodeConfiguration.class);
AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey());
AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
@ -85,8 +87,9 @@ public class TbSqsNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(publishMessageAsync(ctx, msg),
ctx::tellSuccess,
t -> ctx.tellFailure(processException(ctx, msg, t), t));
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.external;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.common.msg.TbMsg;
public abstract class TbAbstractExternalNode implements TbNode {
private boolean forceAck;
public void init(TbContext ctx) {
this.forceAck = ctx.isExternalNodeForceAck();
}
protected void tellSuccess(TbContext ctx, TbMsg tbMsg) {
if (forceAck) {
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.SUCCESS);
} else {
ctx.tellSuccess(tbMsg);
}
}
protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) {
if (forceAck) {
if (t == null) {
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.FAILURE);
} else {
ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t);
}
} else {
if (t == null) {
ctx.tellNext(tbMsg, TbRelationTypes.FAILURE);
} else {
ctx.tellFailure(tbMsg, t);
}
}
}
protected void ackIfNeeded(TbContext ctx, TbMsg msg) {
if (forceAck) {
ctx.ack(msg);
}
}
}

View File

@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit;
configDirective = "tbExternalNodePubSubConfig",
iconUrl = ""
)
public class TbPubSubNode implements TbNode {
public class TbPubSubNode extends TbAbstractExternalNode {
private static final String MESSAGE_ID = "messageId";
private static final String ERROR = "error";
@ -63,8 +64,9 @@ public class TbPubSubNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
try {
this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
this.pubSubClient = initPubSubClient();
} catch (Exception e) {
throw new TbNodeException(e);
@ -74,6 +76,7 @@ public class TbPubSubNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
publishMessage(ctx, msg);
ackIfNeeded(ctx, msg);
}
@Override
@ -101,12 +104,12 @@ public class TbPubSubNode implements TbNode {
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
public void onSuccess(String messageId) {
TbMsg next = processPublishResult(ctx, msg, messageId);
ctx.tellSuccess(next);
tellSuccess(ctx, next);
}
public void onFailure(Throwable t) {
TbMsg next = processException(ctx, msg, t);
ctx.tellFailure(next, t);
tellFailure(ctx, next, t);
}
},
ctx.getExternalCallExecutor());

View File

@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -56,7 +57,7 @@ import java.util.Properties;
configDirective = "tbExternalNodeKafkaConfig",
iconUrl = ""
)
public class TbKafkaNode implements TbNode {
public class TbKafkaNode extends TbAbstractExternalNode {
private static final String OFFSET = "offset";
private static final String PARTITION = "partition";
@ -78,6 +79,7 @@ public class TbKafkaNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
this.initError = null;
Properties properties = new Properties();
@ -129,6 +131,7 @@ public class TbKafkaNode implements TbNode {
return null;
});
}
ackIfNeeded(ctx, msg);
} catch (Exception e) {
ctx.tellFailure(msg, e);
}
@ -164,11 +167,9 @@ public class TbKafkaNode implements TbNode {
private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) {
if (e == null) {
TbMsg next = processResponse(ctx, msg, metadata);
ctx.tellNext(next, TbRelationTypes.SUCCESS);
tellSuccess(ctx, processResponse(ctx, msg, metadata));
} else {
TbMsg next = processException(ctx, msg, e);
ctx.tellFailure(next, e);
tellFailure(ctx, processException(ctx, msg, e), e);
}
}

View File

@ -22,10 +22,10 @@ import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbEmail;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -47,7 +47,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
configDirective = "tbExternalNodeSendEmailConfig",
icon = "send"
)
public class TbSendEmailNode implements TbNode {
public class TbSendEmailNode extends TbAbstractExternalNode {
private static final String MAIL_PROP = "mail.";
static final String SEND_EMAIL_TYPE = "SEND_EMAIL";
@ -56,8 +56,9 @@ public class TbSendEmailNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class);
try {
this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class);
if (!this.config.isUseSystemSmtpSettings()) {
mailSender = createMailSender();
}
@ -75,8 +76,9 @@ public class TbSendEmailNode implements TbNode {
sendEmail(ctx, msg, email);
return null;
}),
ok -> ctx.tellSuccess(msg),
fail -> ctx.tellFailure(msg, fail));
ok -> tellSuccess(ctx, msg),
fail -> tellFailure(ctx, msg, fail));
ackIfNeeded(ctx, msg);
} catch (Exception ex) {
ctx.tellFailure(msg, ex);
}

View File

@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.credentials.BasicCredentials;
import org.thingsboard.rule.engine.credentials.ClientCredentials;
import org.thingsboard.rule.engine.credentials.CredentialsType;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.plugin.ComponentType;
@ -55,7 +56,7 @@ import java.util.concurrent.TimeoutException;
configDirective = "tbExternalNodeMqttConfig",
icon = "call_split"
)
public class TbMqttNode implements TbNode {
public class TbMqttNode extends TbAbstractExternalNode {
private static final Charset UTF8 = Charset.forName("UTF-8");
@ -67,8 +68,9 @@ public class TbMqttNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
try {
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
this.mqttClient = initClient(ctx);
} catch (Exception e) {
throw new TbNodeException(e);
@ -81,13 +83,13 @@ public class TbMqttNode implements TbNode {
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
.addListener(future -> {
if (future.isSuccess()) {
ctx.tellSuccess(msg);
tellSuccess(ctx, msg);
} else {
TbMsg next = processException(ctx, msg, future.cause());
ctx.tellFailure(next, future.cause());
tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause());
}
}
);
ackIfNeeded(ctx, msg);
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {

View File

@ -48,8 +48,9 @@ import javax.net.ssl.SSLException;
public class TbAzureIotHubNode extends TbMqttNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
try {
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
mqttNodeConfiguration.setPort(8883);
mqttNodeConfiguration.setCleanSession(true);
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();

View File

@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo;
@ -42,12 +43,13 @@ import java.util.concurrent.ExecutionException;
configDirective = "tbExternalNodeNotificationConfig",
icon = "notifications"
)
public class TbNotificationNode implements TbNode {
public class TbNotificationNode extends TbAbstractExternalNode {
private TbNotificationNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbNotificationNodeConfiguration.class);
}
@ -69,15 +71,16 @@ public class TbNotificationNode implements TbNode {
.originatorEntityId(ctx.getSelf().getRuleChainId())
.build();
DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> {
return ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> {
TbMsgMetaData metaData = msg.getMetaData().copy();
metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
ctx.tellSuccess(TbMsg.transformMsg(msg, metaData));
});
}),
r -> {},
e -> ctx.tellFailure(msg, e));
DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() ->
ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> {
TbMsgMetaData metaData = msg.getMetaData().copy();
metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
tellSuccess(ctx, TbMsg.transformMsg(msg, metaData));
})),
r -> {
},
e -> tellFailure(ctx, msg, e));
ackIfNeeded(ctx, msg);
}
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -37,12 +38,13 @@ import java.util.concurrent.ExecutionException;
configDirective = "tbExternalNodeSlackConfig",
iconUrl = ""
)
public class TbSlackNode implements TbNode {
public class TbSlackNode extends TbAbstractExternalNode {
private TbSlackNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbSlackNodeConfiguration.class);
}
@ -62,8 +64,9 @@ public class TbSlackNode implements TbNode {
DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> {
ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message);
}),
r -> ctx.tellSuccess(msg),
e -> ctx.tellFailure(msg, e));
r -> tellSuccess(ctx, msg),
e -> tellFailure(ctx, msg, e));
ackIfNeeded(ctx, msg);
}
}

View File

@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -48,7 +49,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
configDirective = "tbExternalNodeRabbitMqConfig",
iconUrl = ""
)
public class TbRabbitMqNode implements TbNode {
public class TbRabbitMqNode extends TbAbstractExternalNode {
private static final Charset UTF8 = Charset.forName("UTF-8");
@ -61,6 +62,7 @@ public class TbRabbitMqNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.config.getHost());
@ -83,11 +85,9 @@ public class TbRabbitMqNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(publishMessageAsync(ctx, msg),
ctx::tellSuccess,
t -> {
TbMsg next = processException(ctx, msg, t);
ctx.tellFailure(next, t);
});
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -65,6 +65,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@Data
@Slf4j
@ -182,14 +183,16 @@ public class TbHttpClient {
}
}
public void processMessage(TbContext ctx, TbMsg msg) {
public void processMessage(TbContext ctx, TbMsg msg,
Consumer<TbMsg> onSuccess,
BiConsumer<TbMsg, Throwable> onFailure) {
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
HttpHeaders headers = prepareHeaders(msg);
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
HttpEntity<String> entity;
if(HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) ||
HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) ||
config.isIgnoreRequestBody()) {
if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) ||
HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) ||
config.isIgnoreRequestBody()) {
entity = new HttpEntity<>(headers);
} else {
entity = new HttpEntity<>(getData(msg), headers);
@ -198,21 +201,18 @@ public class TbHttpClient {
URI uri = buildEncodedUri(endpointUrl);
ListenableFuture<ResponseEntity<String>> future = httpClient.exchange(
uri, method, entity, String.class);
future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
TbMsg next = processException(ctx, msg, throwable);
ctx.tellFailure(next, throwable);
onFailure.accept(processException(ctx, msg, throwable), throwable);
}
@Override
public void onSuccess(ResponseEntity<String> responseEntity) {
if (responseEntity.getStatusCode().is2xxSuccessful()) {
TbMsg next = processResponse(ctx, msg, responseEntity);
ctx.tellSuccess(next);
onSuccess.accept(processResponse(ctx, msg, responseEntity));
} else {
TbMsg next = processFailureResponse(ctx, msg, responseEntity);
ctx.tellNext(next, TbRelationTypes.FAILURE);
onFailure.accept(processFailureResponse(ctx, msg, responseEntity), null);
}
}
});
@ -248,7 +248,7 @@ public class TbHttpClient {
if (config.isTrimDoubleQuotes()) {
final String dataBefore = data;
data = data.replaceAll("^\"|\"$", "");;
data = data.replaceAll("^\"|\"$", "");
log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -43,24 +44,26 @@ import org.thingsboard.server.common.msg.TbMsg;
configDirective = "tbExternalNodeRestApiCallConfig",
iconUrl = ""
)
public class TbRestApiCallNode implements TbNode {
public class TbRestApiCallNode extends TbAbstractExternalNode {
private boolean useRedisQueueForMsgPersistence;
protected TbHttpClient httpClient;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
httpClient = new TbHttpClient(config, ctx.getSharedEventLoop());
useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
if (useRedisQueueForMsgPersistence) {
if (config.isUseRedisQueueForMsgPersistence()) {
log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId());
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
httpClient.processMessage(ctx, msg);
httpClient.processMessage(ctx, msg,
m -> tellSuccess(ctx, m),
(m, t) -> tellFailure(ctx, m, t));
ackIfNeeded(ctx, msg);
}
@Override

View File

@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.sms.SmsSender;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@ -39,13 +40,14 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
configDirective = "tbExternalNodeSendSmsConfig",
icon = "sms"
)
public class TbSendSmsNode implements TbNode {
public class TbSendSmsNode extends TbAbstractExternalNode {
private TbSendSmsNodeConfiguration config;
private SmsSender smsSender;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
try {
this.config = TbNodeUtils.convert(configuration, TbSendSmsNodeConfiguration.class);
if (!this.config.isUseSystemSmsSettings()) {
@ -63,8 +65,9 @@ public class TbSendSmsNode implements TbNode {
sendSms(ctx, msg);
return null;
}),
ok -> ctx.tellSuccess(msg),
fail -> ctx.tellFailure(msg, fail));
ok -> tellSuccess(ctx, msg),
fail -> tellFailure(ctx, msg, fail));
ackIfNeeded(ctx, msg);
} catch (Exception ex) {
ctx.tellFailure(msg, ex);
}

View File

@ -167,7 +167,9 @@ public class TbHttpClientTest {
capturedData.capture()
)).thenReturn(successMsg);
httpClient.processMessage(ctx, msg);
httpClient.processMessage(ctx, msg,
m -> ctx.tellSuccess(msg),
(m, t) -> ctx.tellFailure(m, t));
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)