Merge with master
This commit is contained in:
		
						commit
						0acabf65dc
					
				@ -536,6 +536,10 @@ public class ActorSystemContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private int maxRpcRetries;
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.rule.external.force_ack:false}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private boolean externalNodeForceAck;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    @Setter
 | 
			
		||||
    private TbActorSystem actorSystem;
 | 
			
		||||
 | 
			
		||||
@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.ScriptEngine;
 | 
			
		||||
import org.thingsboard.rule.engine.api.SmsService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.rule.engine.api.slack.SlackService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
 | 
			
		||||
@ -214,6 +215,12 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
        enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void enqueueForTellFailure(TbMsg tbMsg, Throwable th) {
 | 
			
		||||
        TopicPartitionInfo tpi = resolvePartition(tbMsg);
 | 
			
		||||
        enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), getFailureMessage(th), null, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
 | 
			
		||||
        TopicPartitionInfo tpi = resolvePartition(tbMsg);
 | 
			
		||||
@ -311,16 +318,7 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
        if (nodeCtx.getSelf().isDebugMode()) {
 | 
			
		||||
            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
 | 
			
		||||
        }
 | 
			
		||||
        String failureMessage;
 | 
			
		||||
        if (th != null) {
 | 
			
		||||
            if (!StringUtils.isEmpty(th.getMessage())) {
 | 
			
		||||
                failureMessage = th.getMessage();
 | 
			
		||||
            } else {
 | 
			
		||||
                failureMessage = th.getClass().getSimpleName();
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            failureMessage = null;
 | 
			
		||||
        }
 | 
			
		||||
        String failureMessage = getFailureMessage(th);
 | 
			
		||||
        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
 | 
			
		||||
                nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
 | 
			
		||||
                msg, failureMessage));
 | 
			
		||||
@ -724,6 +722,11 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
        return mainCtx.getSlackService();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isExternalNodeForceAck() {
 | 
			
		||||
        return mainCtx.isExternalNodeForceAck();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public RuleEngineRpcService getRpcService() {
 | 
			
		||||
        return mainCtx.getTbRuleEngineDeviceRpcService();
 | 
			
		||||
@ -840,12 +843,26 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void checkTenantEntity(EntityId entityId) {
 | 
			
		||||
    public void checkTenantEntity(EntityId entityId) throws TbNodeException {
 | 
			
		||||
        if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
 | 
			
		||||
            throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.");
 | 
			
		||||
            throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static String getFailureMessage(Throwable th) {
 | 
			
		||||
        String failureMessage;
 | 
			
		||||
        if (th != null) {
 | 
			
		||||
            if (!StringUtils.isEmpty(th.getMessage())) {
 | 
			
		||||
                failureMessage = th.getMessage();
 | 
			
		||||
            } else {
 | 
			
		||||
                failureMessage = th.getClass().getSimpleName();
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            failureMessage = null;
 | 
			
		||||
        }
 | 
			
		||||
        return failureMessage;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private class SimpleTbQueueCallback implements TbQueueCallback {
 | 
			
		||||
        private final Runnable onSuccess;
 | 
			
		||||
        private final Consumer<Throwable> onFailure;
 | 
			
		||||
 | 
			
		||||
@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbEntityActorId;
 | 
			
		||||
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
 | 
			
		||||
import org.thingsboard.server.actors.service.ContextAwareActor;
 | 
			
		||||
import org.thingsboard.server.actors.service.DefaultActorService;
 | 
			
		||||
import org.thingsboard.server.actors.shared.RuleChainErrorActor;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChain;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.dao.rule.RuleChainService;
 | 
			
		||||
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
@ -86,7 +88,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
 | 
			
		||||
                () -> DefaultActorService.RULE_DISPATCHER_NAME,
 | 
			
		||||
                () -> {
 | 
			
		||||
                    RuleChain ruleChain = provider.apply(ruleChainId);
 | 
			
		||||
                    return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
 | 
			
		||||
                    if (ruleChain == null) {
 | 
			
		||||
                        return new RuleChainErrorActor.ActorCreator(systemContext, tenantId,
 | 
			
		||||
                                new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!"));
 | 
			
		||||
                    } else {
 | 
			
		||||
                        return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,78 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors.shared;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.TbActor;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorId;
 | 
			
		||||
import org.thingsboard.server.actors.TbStringActorId;
 | 
			
		||||
import org.thingsboard.server.actors.service.ContextAwareActor;
 | 
			
		||||
import org.thingsboard.server.actors.service.ContextBasedCreator;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class RuleChainErrorActor extends ContextAwareActor {
 | 
			
		||||
 | 
			
		||||
    private final TenantId tenantId;
 | 
			
		||||
    private final RuleEngineException error;
 | 
			
		||||
 | 
			
		||||
    private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) {
 | 
			
		||||
        super(systemContext);
 | 
			
		||||
        this.tenantId = tenantId;
 | 
			
		||||
        this.error = error;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected boolean doProcess(TbActorMsg msg) {
 | 
			
		||||
        if (msg instanceof RuleChainAwareMsg) {
 | 
			
		||||
            log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg);
 | 
			
		||||
            var rcMsg = (RuleChainAwareMsg) msg;
 | 
			
		||||
            rcMsg.getMsg().getCallback().onFailure(error);
 | 
			
		||||
            return true;
 | 
			
		||||
        } else {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static class ActorCreator extends ContextBasedCreator {
 | 
			
		||||
 | 
			
		||||
        private final TenantId tenantId;
 | 
			
		||||
        private final RuleEngineException error;
 | 
			
		||||
 | 
			
		||||
        public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) {
 | 
			
		||||
            super(context);
 | 
			
		||||
            this.tenantId = tenantId;
 | 
			
		||||
            this.error = error;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public TbActorId createActorId() {
 | 
			
		||||
            return new TbStringActorId(UUID.randomUUID().toString());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public TbActor createActor() {
 | 
			
		||||
            return new RuleChainErrorActor(context, tenantId, error);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -393,6 +393,10 @@ actors:
 | 
			
		||||
      queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
 | 
			
		||||
      # Time in milliseconds for transaction to complete
 | 
			
		||||
      duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
 | 
			
		||||
    external:
 | 
			
		||||
      # Force acknowledgement of the incoming message for external rule nodes to decrease processing latency.
 | 
			
		||||
      # Enqueue the result of external node processing as a separate message to the rule engine.
 | 
			
		||||
      force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}"
 | 
			
		||||
  rpc:
 | 
			
		||||
    max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
 | 
			
		||||
    sequential: "${ACTORS_RPC_SEQUENTIAL:false}"
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import lombok.Data;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.msg.MsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorError;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorStopReason;
 | 
			
		||||
 | 
			
		||||
@ -69,9 +70,14 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
 | 
			
		||||
            InitFailureStrategy strategy;
 | 
			
		||||
            int attemptIdx = attempt + 1;
 | 
			
		||||
            InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
 | 
			
		||||
            if (isUnrecoverable(t)) {
 | 
			
		||||
                strategy = InitFailureStrategy.stop();
 | 
			
		||||
            } else {
 | 
			
		||||
                log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
 | 
			
		||||
                strategy = actor.onInitFailure(attempt, t);
 | 
			
		||||
            }
 | 
			
		||||
            if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
 | 
			
		||||
                log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
 | 
			
		||||
                stopReason = TbActorStopReason.INIT_FAILED;
 | 
			
		||||
@ -88,6 +94,13 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static boolean isUnrecoverable(Throwable t) {
 | 
			
		||||
        if (t instanceof TbActorException && t.getCause() != null) {
 | 
			
		||||
            t = t.getCause();
 | 
			
		||||
        }
 | 
			
		||||
        return t instanceof TbActorError && ((TbActorError) t).isUnrecoverable();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void enqueue(TbActorMsg msg, boolean highPriority) {
 | 
			
		||||
        if (!destroyInProgress.get()) {
 | 
			
		||||
            if (highPriority) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,22 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.msg;
 | 
			
		||||
 | 
			
		||||
public interface TbActorError {
 | 
			
		||||
 | 
			
		||||
    boolean isUnrecoverable();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -277,6 +277,11 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
                this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbMsg copyWithNewCtx() {
 | 
			
		||||
        return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId,
 | 
			
		||||
                this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbMsgCallback getCallback() {
 | 
			
		||||
        // May be null in case of deserialization;
 | 
			
		||||
        if (callback != null) {
 | 
			
		||||
 | 
			
		||||
@ -17,9 +17,12 @@ package org.thingsboard.server.common.msg.aware;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
public interface RuleChainAwareMsg extends TbActorMsg {
 | 
			
		||||
 | 
			
		||||
	RuleChainId getRuleChainId();
 | 
			
		||||
 | 
			
		||||
	TbMsg getMsg();
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -167,6 +167,8 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    void enqueueForTellFailure(TbMsg msg, String failureMessage);
 | 
			
		||||
 | 
			
		||||
    void enqueueForTellFailure(TbMsg tbMsg, Throwable t);
 | 
			
		||||
 | 
			
		||||
    void enqueueForTellNext(TbMsg msg, String relationType);
 | 
			
		||||
 | 
			
		||||
    void enqueueForTellNext(TbMsg msg, Set<String> relationTypes);
 | 
			
		||||
@ -210,7 +212,7 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    void schedule(Runnable runnable, long delay, TimeUnit timeUnit);
 | 
			
		||||
 | 
			
		||||
    void checkTenantEntity(EntityId entityId);
 | 
			
		||||
    void checkTenantEntity(EntityId entityId) throws TbNodeException;
 | 
			
		||||
 | 
			
		||||
    boolean isLocalEntity(EntityId entityId);
 | 
			
		||||
 | 
			
		||||
@ -302,6 +304,8 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    SlackService getSlackService();
 | 
			
		||||
 | 
			
		||||
    boolean isExternalNodeForceAck();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Creates JS Script Engine
 | 
			
		||||
     * @deprecated
 | 
			
		||||
 | 
			
		||||
@ -15,17 +15,33 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorError;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 19.01.18.
 | 
			
		||||
 */
 | 
			
		||||
public class TbNodeException extends Exception {
 | 
			
		||||
public class TbNodeException extends Exception implements TbActorError {
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final boolean unrecoverable;
 | 
			
		||||
 | 
			
		||||
    public TbNodeException(String message) {
 | 
			
		||||
        this(message, false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbNodeException(String message, boolean unrecoverable) {
 | 
			
		||||
        super(message);
 | 
			
		||||
        this.unrecoverable = unrecoverable;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbNodeException(Exception e) {
 | 
			
		||||
        this(e, false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbNodeException(Exception e, boolean unrecoverable) {
 | 
			
		||||
        super(e);
 | 
			
		||||
        this.unrecoverable = unrecoverable;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -44,7 +44,7 @@ public class TbNodeUtils {
 | 
			
		||||
        try {
 | 
			
		||||
            return JacksonUtil.treeToValue(configuration.getData(), clazz);
 | 
			
		||||
        } catch (IllegalArgumentException e) {
 | 
			
		||||
            throw new TbNodeException(e);
 | 
			
		||||
            throw new TbNodeException(e, true);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -66,7 +66,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
        if (!this.config.isDynamicSeverity()) {
 | 
			
		||||
            this.notDynamicAlarmSeverity = EnumUtils.getEnum(AlarmSeverity.class, this.config.getSeverity());
 | 
			
		||||
            if (this.notDynamicAlarmSeverity == null) {
 | 
			
		||||
                throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity());
 | 
			
		||||
                throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity(), true);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -26,10 +26,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
@ -51,7 +52,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
 | 
			
		||||
        configDirective = "tbExternalNodeSnsConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbSnsNode implements TbNode {
 | 
			
		||||
public class TbSnsNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final String MESSAGE_ID = "messageId";
 | 
			
		||||
    private static final String REQUEST_ID = "requestId";
 | 
			
		||||
@ -62,6 +63,7 @@ public class TbSnsNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbSnsNodeConfiguration.class);
 | 
			
		||||
        AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey());
 | 
			
		||||
        AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
 | 
			
		||||
@ -78,8 +80,9 @@ public class TbSnsNode implements TbNode {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
 | 
			
		||||
        withCallback(publishMessageAsync(ctx, msg),
 | 
			
		||||
                ctx::tellSuccess,
 | 
			
		||||
                t -> ctx.tellFailure(processException(ctx, msg, t), t));
 | 
			
		||||
                m -> tellSuccess(ctx, m),
 | 
			
		||||
                t -> tellFailure(ctx, processException(ctx, msg, t), t));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
 | 
			
		||||
 | 
			
		||||
@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -55,7 +56,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
 | 
			
		||||
        configDirective = "tbExternalNodeSqsConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbSqsNode implements TbNode {
 | 
			
		||||
public class TbSqsNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final String MESSAGE_ID = "messageId";
 | 
			
		||||
    private static final String REQUEST_ID = "requestId";
 | 
			
		||||
@ -69,6 +70,7 @@ public class TbSqsNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbSqsNodeConfiguration.class);
 | 
			
		||||
        AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey());
 | 
			
		||||
        AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
 | 
			
		||||
@ -85,8 +87,9 @@ public class TbSqsNode implements TbNode {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        withCallback(publishMessageAsync(ctx, msg),
 | 
			
		||||
                ctx::tellSuccess,
 | 
			
		||||
                t -> ctx.tellFailure(processException(ctx, msg, t), t));
 | 
			
		||||
                m -> tellSuccess(ctx, m),
 | 
			
		||||
                t -> tellFailure(ctx, processException(ctx, msg, t), t));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,61 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.external;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
public abstract class TbAbstractExternalNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private boolean forceAck;
 | 
			
		||||
 | 
			
		||||
    public void init(TbContext ctx) {
 | 
			
		||||
        this.forceAck = ctx.isExternalNodeForceAck();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void tellSuccess(TbContext ctx, TbMsg tbMsg) {
 | 
			
		||||
        if (forceAck) {
 | 
			
		||||
            ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.SUCCESS);
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.tellSuccess(tbMsg);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) {
 | 
			
		||||
        if (forceAck) {
 | 
			
		||||
            if (t == null) {
 | 
			
		||||
                ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.FAILURE);
 | 
			
		||||
            } else {
 | 
			
		||||
                ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            if (t == null) {
 | 
			
		||||
                ctx.tellNext(tbMsg, TbRelationTypes.FAILURE);
 | 
			
		||||
            } else {
 | 
			
		||||
                ctx.tellFailure(tbMsg, t);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void ackIfNeeded(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        if (forceAck) {
 | 
			
		||||
            ctx.ack(msg);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
        configDirective = "tbExternalNodePubSubConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbPubSubNode implements TbNode {
 | 
			
		||||
public class TbPubSubNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final String MESSAGE_ID = "messageId";
 | 
			
		||||
    private static final String ERROR = "error";
 | 
			
		||||
@ -63,8 +64,9 @@ public class TbPubSubNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
 | 
			
		||||
        try {
 | 
			
		||||
            this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
 | 
			
		||||
            this.pubSubClient = initPubSubClient();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            throw new TbNodeException(e);
 | 
			
		||||
@ -74,6 +76,7 @@ public class TbPubSubNode implements TbNode {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        publishMessage(ctx, msg);
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -101,12 +104,12 @@ public class TbPubSubNode implements TbNode {
 | 
			
		||||
        ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
 | 
			
		||||
                    public void onSuccess(String messageId) {
 | 
			
		||||
                        TbMsg next = processPublishResult(ctx, msg, messageId);
 | 
			
		||||
                        ctx.tellSuccess(next);
 | 
			
		||||
                        tellSuccess(ctx, next);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    public void onFailure(Throwable t) {
 | 
			
		||||
                        TbMsg next = processException(ctx, msg, t);
 | 
			
		||||
                        ctx.tellFailure(next, t);
 | 
			
		||||
                        tellFailure(ctx, next, t);
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
                ctx.getExternalCallExecutor());
 | 
			
		||||
 | 
			
		||||
@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -56,7 +57,7 @@ import java.util.Properties;
 | 
			
		||||
        configDirective = "tbExternalNodeKafkaConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbKafkaNode implements TbNode {
 | 
			
		||||
public class TbKafkaNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final String OFFSET = "offset";
 | 
			
		||||
    private static final String PARTITION = "partition";
 | 
			
		||||
@ -78,6 +79,7 @@ public class TbKafkaNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
 | 
			
		||||
        this.initError = null;
 | 
			
		||||
        Properties properties = new Properties();
 | 
			
		||||
@ -129,6 +131,7 @@ public class TbKafkaNode implements TbNode {
 | 
			
		||||
                    return null;
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
            ackIfNeeded(ctx, msg);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            ctx.tellFailure(msg, e);
 | 
			
		||||
        }
 | 
			
		||||
@ -164,11 +167,9 @@ public class TbKafkaNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) {
 | 
			
		||||
        if (e == null) {
 | 
			
		||||
            TbMsg next = processResponse(ctx, msg, metadata);
 | 
			
		||||
            ctx.tellNext(next, TbRelationTypes.SUCCESS);
 | 
			
		||||
            tellSuccess(ctx, processResponse(ctx, msg, metadata));
 | 
			
		||||
        } else {
 | 
			
		||||
            TbMsg next = processException(ctx, msg, e);
 | 
			
		||||
            ctx.tellFailure(next, e);
 | 
			
		||||
            tellFailure(ctx, processException(ctx, msg, e), e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -22,10 +22,10 @@ import org.springframework.mail.javamail.JavaMailSenderImpl;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbEmail;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
@ -47,7 +47,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
 | 
			
		||||
        configDirective = "tbExternalNodeSendEmailConfig",
 | 
			
		||||
        icon = "send"
 | 
			
		||||
)
 | 
			
		||||
public class TbSendEmailNode implements TbNode {
 | 
			
		||||
public class TbSendEmailNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final String MAIL_PROP = "mail.";
 | 
			
		||||
    static final String SEND_EMAIL_TYPE = "SEND_EMAIL";
 | 
			
		||||
@ -56,8 +56,9 @@ public class TbSendEmailNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class);
 | 
			
		||||
        try {
 | 
			
		||||
            this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class);
 | 
			
		||||
            if (!this.config.isUseSystemSmtpSettings()) {
 | 
			
		||||
                mailSender = createMailSender();
 | 
			
		||||
            }
 | 
			
		||||
@ -75,8 +76,9 @@ public class TbSendEmailNode implements TbNode {
 | 
			
		||||
                        sendEmail(ctx, msg, email);
 | 
			
		||||
                        return null;
 | 
			
		||||
                    }),
 | 
			
		||||
                    ok -> ctx.tellSuccess(msg),
 | 
			
		||||
                    fail -> ctx.tellFailure(msg, fail));
 | 
			
		||||
                    ok -> tellSuccess(ctx, msg),
 | 
			
		||||
                    fail -> tellFailure(ctx, msg, fail));
 | 
			
		||||
            ackIfNeeded(ctx, msg);
 | 
			
		||||
        } catch (Exception ex) {
 | 
			
		||||
            ctx.tellFailure(msg, ex);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.BasicCredentials;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.ClientCredentials;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.CredentialsType;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
@ -55,7 +56,7 @@ import java.util.concurrent.TimeoutException;
 | 
			
		||||
        configDirective = "tbExternalNodeMqttConfig",
 | 
			
		||||
        icon = "call_split"
 | 
			
		||||
)
 | 
			
		||||
public class TbMqttNode implements TbNode {
 | 
			
		||||
public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final Charset UTF8 = Charset.forName("UTF-8");
 | 
			
		||||
 | 
			
		||||
@ -67,8 +68,9 @@ public class TbMqttNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
 | 
			
		||||
        try {
 | 
			
		||||
            this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
 | 
			
		||||
            this.mqttClient = initClient(ctx);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            throw new TbNodeException(e);
 | 
			
		||||
@ -81,13 +83,13 @@ public class TbMqttNode implements TbNode {
 | 
			
		||||
        this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
 | 
			
		||||
                .addListener(future -> {
 | 
			
		||||
                            if (future.isSuccess()) {
 | 
			
		||||
                                ctx.tellSuccess(msg);
 | 
			
		||||
                                tellSuccess(ctx, msg);
 | 
			
		||||
                            } else {
 | 
			
		||||
                                TbMsg next = processException(ctx, msg, future.cause());
 | 
			
		||||
                                ctx.tellFailure(next, future.cause());
 | 
			
		||||
                                tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause());
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                );
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
 | 
			
		||||
 | 
			
		||||
@ -48,8 +48,9 @@ import javax.net.ssl.SSLException;
 | 
			
		||||
public class TbAzureIotHubNode extends TbMqttNode {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
 | 
			
		||||
        try {
 | 
			
		||||
            this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
 | 
			
		||||
            mqttNodeConfiguration.setPort(8883);
 | 
			
		||||
            mqttNodeConfiguration.setCleanSession(true);
 | 
			
		||||
            ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
 | 
			
		||||
 | 
			
		||||
@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo;
 | 
			
		||||
@ -42,12 +43,13 @@ import java.util.concurrent.ExecutionException;
 | 
			
		||||
        configDirective = "tbExternalNodeNotificationConfig",
 | 
			
		||||
        icon = "notifications"
 | 
			
		||||
)
 | 
			
		||||
public class TbNotificationNode implements TbNode {
 | 
			
		||||
public class TbNotificationNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private TbNotificationNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbNotificationNodeConfiguration.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -69,15 +71,16 @@ public class TbNotificationNode implements TbNode {
 | 
			
		||||
                .originatorEntityId(ctx.getSelf().getRuleChainId())
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> {
 | 
			
		||||
                    return ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> {
 | 
			
		||||
                        TbMsgMetaData metaData = msg.getMetaData().copy();
 | 
			
		||||
                        metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
 | 
			
		||||
                        ctx.tellSuccess(TbMsg.transformMsg(msg, metaData));
 | 
			
		||||
                    });
 | 
			
		||||
                }),
 | 
			
		||||
                r -> {},
 | 
			
		||||
                e -> ctx.tellFailure(msg, e));
 | 
			
		||||
        DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() ->
 | 
			
		||||
                        ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> {
 | 
			
		||||
                            TbMsgMetaData metaData = msg.getMetaData().copy();
 | 
			
		||||
                            metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
 | 
			
		||||
                            tellSuccess(ctx, TbMsg.transformMsg(msg, metaData));
 | 
			
		||||
                        })),
 | 
			
		||||
                r -> {
 | 
			
		||||
                },
 | 
			
		||||
                e -> tellFailure(ctx, msg, e));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
@ -37,12 +38,13 @@ import java.util.concurrent.ExecutionException;
 | 
			
		||||
        configDirective = "tbExternalNodeSlackConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbSlackNode implements TbNode {
 | 
			
		||||
public class TbSlackNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private TbSlackNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbSlackNodeConfiguration.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -62,8 +64,9 @@ public class TbSlackNode implements TbNode {
 | 
			
		||||
        DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> {
 | 
			
		||||
                    ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message);
 | 
			
		||||
                }),
 | 
			
		||||
                r -> ctx.tellSuccess(msg),
 | 
			
		||||
                e -> ctx.tellFailure(msg, e));
 | 
			
		||||
                r -> tellSuccess(ctx, msg),
 | 
			
		||||
                e -> tellFailure(ctx, msg, e));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -48,7 +49,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
 | 
			
		||||
        configDirective = "tbExternalNodeRabbitMqConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbRabbitMqNode implements TbNode {
 | 
			
		||||
public class TbRabbitMqNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private static final Charset UTF8 = Charset.forName("UTF-8");
 | 
			
		||||
 | 
			
		||||
@ -61,6 +62,7 @@ public class TbRabbitMqNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class);
 | 
			
		||||
        ConnectionFactory factory = new ConnectionFactory();
 | 
			
		||||
        factory.setHost(this.config.getHost());
 | 
			
		||||
@ -83,11 +85,9 @@ public class TbRabbitMqNode implements TbNode {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        withCallback(publishMessageAsync(ctx, msg),
 | 
			
		||||
                ctx::tellSuccess,
 | 
			
		||||
                t -> {
 | 
			
		||||
                    TbMsg next = processException(ctx, msg, t);
 | 
			
		||||
                    ctx.tellFailure(next, t);
 | 
			
		||||
                });
 | 
			
		||||
                m -> tellSuccess(ctx, m),
 | 
			
		||||
                t -> tellFailure(ctx, processException(ctx, msg, t), t));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
 | 
			
		||||
 | 
			
		||||
@ -65,6 +65,7 @@ import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedDeque;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ -182,14 +183,16 @@ public class TbHttpClient {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void processMessage(TbContext ctx, TbMsg msg) {
 | 
			
		||||
    public void processMessage(TbContext ctx, TbMsg msg,
 | 
			
		||||
                               Consumer<TbMsg> onSuccess,
 | 
			
		||||
                               BiConsumer<TbMsg, Throwable> onFailure) {
 | 
			
		||||
        String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
 | 
			
		||||
        HttpHeaders headers = prepareHeaders(msg);
 | 
			
		||||
        HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
 | 
			
		||||
        HttpEntity<String> entity;
 | 
			
		||||
        if(HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) ||
 | 
			
		||||
            HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) ||
 | 
			
		||||
            config.isIgnoreRequestBody()) {
 | 
			
		||||
        if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) ||
 | 
			
		||||
                HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) ||
 | 
			
		||||
                config.isIgnoreRequestBody()) {
 | 
			
		||||
            entity = new HttpEntity<>(headers);
 | 
			
		||||
        } else {
 | 
			
		||||
            entity = new HttpEntity<>(getData(msg), headers);
 | 
			
		||||
@ -198,21 +201,18 @@ public class TbHttpClient {
 | 
			
		||||
        URI uri = buildEncodedUri(endpointUrl);
 | 
			
		||||
        ListenableFuture<ResponseEntity<String>> future = httpClient.exchange(
 | 
			
		||||
                uri, method, entity, String.class);
 | 
			
		||||
        future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
 | 
			
		||||
        future.addCallback(new ListenableFutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable throwable) {
 | 
			
		||||
                TbMsg next = processException(ctx, msg, throwable);
 | 
			
		||||
                ctx.tellFailure(next, throwable);
 | 
			
		||||
                onFailure.accept(processException(ctx, msg, throwable), throwable);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(ResponseEntity<String> responseEntity) {
 | 
			
		||||
                if (responseEntity.getStatusCode().is2xxSuccessful()) {
 | 
			
		||||
                    TbMsg next = processResponse(ctx, msg, responseEntity);
 | 
			
		||||
                    ctx.tellSuccess(next);
 | 
			
		||||
                    onSuccess.accept(processResponse(ctx, msg, responseEntity));
 | 
			
		||||
                } else {
 | 
			
		||||
                    TbMsg next = processFailureResponse(ctx, msg, responseEntity);
 | 
			
		||||
                    ctx.tellNext(next, TbRelationTypes.FAILURE);
 | 
			
		||||
                    onFailure.accept(processFailureResponse(ctx, msg, responseEntity), null);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
@ -248,7 +248,7 @@ public class TbHttpClient {
 | 
			
		||||
 | 
			
		||||
        if (config.isTrimDoubleQuotes()) {
 | 
			
		||||
            final String dataBefore = data;
 | 
			
		||||
            data = data.replaceAll("^\"|\"$", "");;
 | 
			
		||||
            data = data.replaceAll("^\"|\"$", "");
 | 
			
		||||
            log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
@ -43,24 +44,26 @@ import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
        configDirective = "tbExternalNodeRestApiCallConfig",
 | 
			
		||||
        iconUrl = ""
 | 
			
		||||
)
 | 
			
		||||
public class TbRestApiCallNode implements TbNode {
 | 
			
		||||
public class TbRestApiCallNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private boolean useRedisQueueForMsgPersistence;
 | 
			
		||||
    protected TbHttpClient httpClient;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
 | 
			
		||||
        httpClient = new TbHttpClient(config, ctx.getSharedEventLoop());
 | 
			
		||||
        useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
 | 
			
		||||
        if (useRedisQueueForMsgPersistence) {
 | 
			
		||||
        if (config.isUseRedisQueueForMsgPersistence()) {
 | 
			
		||||
            log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        httpClient.processMessage(ctx, msg);
 | 
			
		||||
        httpClient.processMessage(ctx, msg,
 | 
			
		||||
                m -> tellSuccess(ctx, m),
 | 
			
		||||
                (m, t) -> tellFailure(ctx, m, t));
 | 
			
		||||
        ackIfNeeded(ctx, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.sms.SmsSender;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
@ -39,13 +40,14 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
 | 
			
		||||
        configDirective = "tbExternalNodeSendSmsConfig",
 | 
			
		||||
        icon = "sms"
 | 
			
		||||
)
 | 
			
		||||
public class TbSendSmsNode implements TbNode {
 | 
			
		||||
public class TbSendSmsNode extends TbAbstractExternalNode {
 | 
			
		||||
 | 
			
		||||
    private TbSendSmsNodeConfiguration config;
 | 
			
		||||
    private SmsSender smsSender;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        try {
 | 
			
		||||
            this.config = TbNodeUtils.convert(configuration, TbSendSmsNodeConfiguration.class);
 | 
			
		||||
            if (!this.config.isUseSystemSmsSettings()) {
 | 
			
		||||
@ -63,8 +65,9 @@ public class TbSendSmsNode implements TbNode {
 | 
			
		||||
                        sendSms(ctx, msg);
 | 
			
		||||
                        return null;
 | 
			
		||||
                    }),
 | 
			
		||||
                    ok -> ctx.tellSuccess(msg),
 | 
			
		||||
                    fail -> ctx.tellFailure(msg, fail));
 | 
			
		||||
                    ok -> tellSuccess(ctx, msg),
 | 
			
		||||
                    fail -> tellFailure(ctx, msg, fail));
 | 
			
		||||
            ackIfNeeded(ctx, msg);
 | 
			
		||||
        } catch (Exception ex) {
 | 
			
		||||
            ctx.tellFailure(msg, ex);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -167,7 +167,9 @@ public class TbHttpClientTest {
 | 
			
		||||
                capturedData.capture()
 | 
			
		||||
        )).thenReturn(successMsg);
 | 
			
		||||
 | 
			
		||||
        httpClient.processMessage(ctx, msg);
 | 
			
		||||
        httpClient.processMessage(ctx, msg,
 | 
			
		||||
                m -> ctx.tellSuccess(msg),
 | 
			
		||||
                (m, t) -> ctx.tellFailure(m, t));
 | 
			
		||||
 | 
			
		||||
        Awaitility.await()
 | 
			
		||||
                .atMost(30, TimeUnit.SECONDS)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user