Merge with master
This commit is contained in:
		
						commit
						344ffb2e0b
					
				@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
			
		||||
        } catch (RuleNodeException rne) {
 | 
			
		||||
            msg.getCallback().onFailure(rne);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));
 | 
			
		||||
            msg.getCallback().onFailure(new RuleEngineException(e.getMessage(), e));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -335,7 +335,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
			
		||||
            msg.getCallback().onFailure(rne);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
 | 
			
		||||
            msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
 | 
			
		||||
            msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage(), e));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -363,7 +363,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
                callback.onSuccess();
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            callback.onFailure(new RuleEngineException(e.getMessage()));
 | 
			
		||||
            callback.onFailure(new RuleEngineException(e.getMessage(), e));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,14 @@ package org.thingsboard.server.service.queue;
 | 
			
		||||
 | 
			
		||||
import io.micrometer.core.instrument.Timer;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ExceptionUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
@ -57,8 +60,23 @@ public class TbMsgPackCallback implements TbMsgCallback {
 | 
			
		||||
        ctx.onSuccess(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onRateLimit(RuleEngineException e) {
 | 
			
		||||
        log.debug("[{}] ON RATE LIMIT", id, e);
 | 
			
		||||
        //TODO notify tenant on rate limit
 | 
			
		||||
        if (failedMsgTimer != null) {
 | 
			
		||||
            failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
 | 
			
		||||
        }
 | 
			
		||||
        ctx.onSuccess(id);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(RuleEngineException e) {
 | 
			
		||||
        if (ExceptionUtil.lookupExceptionInCause(e, AbstractRateLimitException.class) != null) {
 | 
			
		||||
            onRateLimit(e);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        log.trace("[{}] ON FAILURE", id, e);
 | 
			
		||||
        if (failedMsgTimer != null) {
 | 
			
		||||
            failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
 | 
			
		||||
 | 
			
		||||
@ -96,7 +96,10 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  # The recalculate_delay property recommended in a microservices architecture setup for rule-engine services.
 | 
			
		||||
  # This property provides a pause to ensure that when a rule-engine service is restarted, other nodes don't immediately attempt to recalculate their partitions.
 | 
			
		||||
  # The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability.
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cluster:
 | 
			
		||||
  stats:
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,98 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.queue;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.Arguments;
 | 
			
		||||
import org.junit.jupiter.params.provider.MethodSource;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeException;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
 | 
			
		||||
class TbMsgPackCallbackTest {
 | 
			
		||||
 | 
			
		||||
    TenantId tenantId;
 | 
			
		||||
    UUID msgId;
 | 
			
		||||
    TbMsgPackProcessingContext ctx;
 | 
			
		||||
    TbMsgPackCallback callback;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setUp() {
 | 
			
		||||
        tenantId = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
        msgId = UUID.randomUUID();
 | 
			
		||||
        ctx = mock(TbMsgPackProcessingContext.class);
 | 
			
		||||
        callback = spy(new TbMsgPackCallback(msgId, tenantId, ctx));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> testOnFailure_NotRateLimitException() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(new RuleEngineException("rule engine no cause")),
 | 
			
		||||
                Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())),
 | 
			
		||||
                Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))),
 | 
			
		||||
                Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))),
 | 
			
		||||
                Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode()))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    void testOnFailure_NotRateLimitException(RuleEngineException ree) {
 | 
			
		||||
        callback.onFailure(ree);
 | 
			
		||||
 | 
			
		||||
        verify(callback, never()).onRateLimit(any());
 | 
			
		||||
        verify(callback, never()).onSuccess();
 | 
			
		||||
        verify(ctx, never()).onSuccess(any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> testOnFailure_RateLimitException() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))),
 | 
			
		||||
                Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))),
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        new RuleEngineException("caused lvl 3",
 | 
			
		||||
                                new RuntimeException(
 | 
			
		||||
                                        new Exception(
 | 
			
		||||
                                                new TbRateLimitsException(EntityType.ASSET)))))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    void testOnFailure_RateLimitException(RuleEngineException ree) {
 | 
			
		||||
        callback.onFailure(ree);
 | 
			
		||||
 | 
			
		||||
        verify(callback).onRateLimit(any());
 | 
			
		||||
        verify(callback).onFailure(any());
 | 
			
		||||
        verify(callback, never()).onSuccess();
 | 
			
		||||
        verify(ctx).onSuccess(msgId);
 | 
			
		||||
        verify(ctx).onSuccess(any());
 | 
			
		||||
        verify(ctx, never()).onFailure(any(), any(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,41 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.exception;
 | 
			
		||||
 | 
			
		||||
public abstract class AbstractRateLimitException extends RuntimeException {
 | 
			
		||||
 | 
			
		||||
    public AbstractRateLimitException() {
 | 
			
		||||
        super();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public AbstractRateLimitException(String message) {
 | 
			
		||||
        super(message);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public AbstractRateLimitException(String message, Throwable cause) {
 | 
			
		||||
        super(message, cause);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public AbstractRateLimitException(Throwable cause) {
 | 
			
		||||
        super(cause);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected AbstractRateLimitException(String message, Throwable cause,
 | 
			
		||||
                                         boolean enableSuppression,
 | 
			
		||||
                                         boolean writableStackTrace) {
 | 
			
		||||
        super(message, cause, enableSuppression, writableStackTrace);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -15,7 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.exception;
 | 
			
		||||
 | 
			
		||||
public class ApiUsageLimitsExceededException extends RuntimeException {
 | 
			
		||||
public class ApiUsageLimitsExceededException extends AbstractRateLimitException {
 | 
			
		||||
    public ApiUsageLimitsExceededException(String message) {
 | 
			
		||||
        super(message);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,11 @@ public class RuleEngineException extends Exception {
 | 
			
		||||
        ts = System.currentTimeMillis();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public RuleEngineException(String message, Throwable t) {
 | 
			
		||||
        super(message != null ? message : "Unknown", t);
 | 
			
		||||
        ts = System.currentTimeMillis();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String toJsonString() {
 | 
			
		||||
        try {
 | 
			
		||||
            return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage()));
 | 
			
		||||
 | 
			
		||||
@ -39,6 +39,10 @@ public interface TbMsgCallback {
 | 
			
		||||
 | 
			
		||||
    void onFailure(RuleEngineException e);
 | 
			
		||||
 | 
			
		||||
    default void onRateLimit(RuleEngineException e) {
 | 
			
		||||
        onFailure(e);
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise.
 | 
			
		||||
     * message may no longer be valid, if the message pack is already expired/canceled/failed.
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 22.10.18.
 | 
			
		||||
 */
 | 
			
		||||
public class TbRateLimitsException extends RuntimeException {
 | 
			
		||||
public class TbRateLimitsException extends AbstractRateLimitException {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final EntityType entityType;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,6 @@ public class MultipleTbQueueCallbackWrapper implements TbQueueCallback {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(Throwable t) {
 | 
			
		||||
        callback.onFailure(new RuleEngineException(t.getMessage()));
 | 
			
		||||
        callback.onFailure(new RuleEngineException(t.getMessage(), t));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,6 @@ public class MultipleTbQueueTbMsgCallbackWrapper implements TbQueueCallback {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(Throwable t) {
 | 
			
		||||
        tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
 | 
			
		||||
        tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -35,6 +35,6 @@ public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(Throwable t) {
 | 
			
		||||
        tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
 | 
			
		||||
        tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -69,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
			
		||||
    private Integer zkSessionTimeout;
 | 
			
		||||
    @Value("${zk.zk_dir}")
 | 
			
		||||
    private String zkDir;
 | 
			
		||||
    @Value("${zk.recalculate_delay:60000}")
 | 
			
		||||
    @Value("${zk.recalculate_delay:0}")
 | 
			
		||||
    private Long recalculateDelay;
 | 
			
		||||
 | 
			
		||||
    protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
 | 
			
		||||
 | 
			
		||||
@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Properties;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
 | 
			
		||||
 | 
			
		||||
    private final Set<TopicPartitionInfo> topics;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final String clientId;
 | 
			
		||||
 | 
			
		||||
    @Builder
 | 
			
		||||
    private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
 | 
			
		||||
        Properties props = settings.toProducerProps();
 | 
			
		||||
 | 
			
		||||
        this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null");
 | 
			
		||||
        if (!StringUtils.isEmpty(clientId)) {
 | 
			
		||||
            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
 | 
			
		||||
        }
 | 
			
		||||
@ -72,6 +79,22 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
 | 
			
		||||
    public void init() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void addAnalyticHeaders(List<Header> headers) {
 | 
			
		||||
        headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8)));
 | 
			
		||||
        headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));
 | 
			
		||||
        if (log.isTraceEnabled()) {
 | 
			
		||||
            try {
 | 
			
		||||
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
 | 
			
		||||
                int maxlevel = Math.min(stackTrace.length, 10);
 | 
			
		||||
                for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders
 | 
			
		||||
                    headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8)));
 | 
			
		||||
                }
 | 
			
		||||
            } catch (Throwable t) {
 | 
			
		||||
                log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
 | 
			
		||||
        try {
 | 
			
		||||
@ -79,7 +102,10 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
 | 
			
		||||
            String key = msg.getKey().toString();
 | 
			
		||||
            byte[] data = msg.getData();
 | 
			
		||||
            ProducerRecord<String, byte[]> record;
 | 
			
		||||
            Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
 | 
			
		||||
            List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
 | 
			
		||||
            if (log.isDebugEnabled()) {
 | 
			
		||||
                addAnalyticHeaders(headers);
 | 
			
		||||
            }
 | 
			
		||||
            record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
 | 
			
		||||
            producer.send(record, (metadata, exception) -> {
 | 
			
		||||
                if (exception == null) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,54 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.kafka;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.kafka.common.header.Header;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.BDDMockito.willCallRealMethod;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
class TbKafkaProducerTemplateTest {
 | 
			
		||||
 | 
			
		||||
    TbKafkaProducerTemplate<TbQueueMsg> producerTemplate;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setUp() {
 | 
			
		||||
        producerTemplate = mock(TbKafkaProducerTemplate.class);
 | 
			
		||||
        willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any());
 | 
			
		||||
        willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testAddAnalyticHeaders() {
 | 
			
		||||
        List<Header> headers = new ArrayList<>();
 | 
			
		||||
        producerTemplate.addAnalyticHeaders(headers);
 | 
			
		||||
        assertThat(headers).isNotEmpty();
 | 
			
		||||
        headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										20
									
								
								common/queue/src/test/resources/logback-test.xml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								common/queue/src/test/resources/logback-test.xml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,20 @@
 | 
			
		||||
<?xml version="1.0" encoding="UTF-8" ?>
 | 
			
		||||
 | 
			
		||||
<configuration>
 | 
			
		||||
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
 | 
			
		||||
        <encoder>
 | 
			
		||||
            <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
 | 
			
		||||
        </encoder>
 | 
			
		||||
    </appender>
 | 
			
		||||
 | 
			
		||||
    <!-- TbKafkaProducerTemplate will add headers for each message when log level:
 | 
			
		||||
           - DEBUG - producerId and thread name
 | 
			
		||||
           - TRACE - will add stacktrace.
 | 
			
		||||
         Kafka compression is highly recommended -->
 | 
			
		||||
    <logger name="org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate" level="TRACE"/>
 | 
			
		||||
 | 
			
		||||
    <root level="INFO">
 | 
			
		||||
        <appender-ref ref="console"/>
 | 
			
		||||
    </root>
 | 
			
		||||
 | 
			
		||||
</configuration>
 | 
			
		||||
@ -0,0 +1,67 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.common.util;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.JsonParseException;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
 | 
			
		||||
import javax.script.ScriptException;
 | 
			
		||||
import java.io.PrintWriter;
 | 
			
		||||
import java.io.StringWriter;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class ExceptionUtil {
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("unchecked")
 | 
			
		||||
    public static <T extends Exception> T lookupException(Throwable source, Class<T> clazz) {
 | 
			
		||||
        Exception e = lookupExceptionInCause(source, clazz);
 | 
			
		||||
        if (e != null) {
 | 
			
		||||
            return (T) e;
 | 
			
		||||
        } else {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Exception lookupExceptionInCause(Throwable source, Class<? extends Exception>... clazzes) {
 | 
			
		||||
        while (source != null) {
 | 
			
		||||
            for (Class<? extends Exception> clazz : clazzes) {
 | 
			
		||||
                if (clazz.isAssignableFrom(source.getClass())) {
 | 
			
		||||
                    return (Exception) source;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            source = source.getCause();
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) {
 | 
			
		||||
        Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class);
 | 
			
		||||
        if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) {
 | 
			
		||||
            return exception.getMessage();
 | 
			
		||||
        } else {
 | 
			
		||||
            if (stackTraceEnabled) {
 | 
			
		||||
                StringWriter sw = new StringWriter();
 | 
			
		||||
                e.printStackTrace(new PrintWriter(sw));
 | 
			
		||||
                return sw.toString();
 | 
			
		||||
            } else {
 | 
			
		||||
                log.debug("[{}] Unknown error during message processing", componentId, e);
 | 
			
		||||
                return "Please contact system administrator";
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,74 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.common.util;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
 | 
			
		||||
class ExceptionUtilTest {
 | 
			
		||||
 | 
			
		||||
    final Exception cause = new RuntimeException();
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() {
 | 
			
		||||
        Exception e = cause;
 | 
			
		||||
        for (int i = 0; i <= 16384; i++) {
 | 
			
		||||
            e = new Exception(e);
 | 
			
		||||
        }
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenCause_whenLookupExceptionInCause_thenReturnCause() {
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() {
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenNoCause_whenLookupExceptionInCause_thenReturnNull() {
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() {
 | 
			
		||||
        final Exception cause = new IOException();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() {
 | 
			
		||||
        final Exception causeIAE = new IllegalAccessException();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE);
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE);
 | 
			
		||||
 | 
			
		||||
        final Exception causeIOE = new IOException(causeIAE);
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull();
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
 | 
			
		||||
        assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.msa.connectivity;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
@ -28,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.testng.annotations.AfterMethod;
 | 
			
		||||
import org.testng.annotations.BeforeMethod;
 | 
			
		||||
import org.testng.annotations.Test;
 | 
			
		||||
import org.thingsboard.common.util.AbstractListeningExecutor;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClient;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClientConfig;
 | 
			
		||||
@ -74,8 +76,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi
 | 
			
		||||
public class MqttClientTest extends AbstractContainerTest {
 | 
			
		||||
 | 
			
		||||
    private Device device;
 | 
			
		||||
    AbstractListeningExecutor handlerExecutor;
 | 
			
		||||
 | 
			
		||||
    @BeforeMethod
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        this.handlerExecutor = new AbstractListeningExecutor() {
 | 
			
		||||
            @Override
 | 
			
		||||
            protected int getThreadPollSize() {
 | 
			
		||||
                return 4;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        handlerExecutor.init();
 | 
			
		||||
 | 
			
		||||
        testRestClient.login("tenant@thingsboard.org", "tenant");
 | 
			
		||||
        device = testRestClient.postDevice("", defaultDevicePrototype("http_"));
 | 
			
		||||
    }
 | 
			
		||||
@ -83,6 +95,9 @@ public class MqttClientTest extends AbstractContainerTest {
 | 
			
		||||
    @AfterMethod
 | 
			
		||||
    public void tearDown() {
 | 
			
		||||
        testRestClient.deleteDeviceIfExists(device.getId());
 | 
			
		||||
        if (handlerExecutor != null) {
 | 
			
		||||
            handlerExecutor.destroy();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    @Test
 | 
			
		||||
    public void telemetryUpload() throws Exception {
 | 
			
		||||
@ -461,11 +476,16 @@ public class MqttClientTest extends AbstractContainerTest {
 | 
			
		||||
        return getMqttClient(deviceCredentials.getCredentialsId(), listener);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getOwnerId() {
 | 
			
		||||
        return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
 | 
			
		||||
        MqttClientConfig clientConfig = new MqttClientConfig();
 | 
			
		||||
        clientConfig.setOwnerId(getOwnerId());
 | 
			
		||||
        clientConfig.setClientId("MQTT client from test");
 | 
			
		||||
        clientConfig.setUsername(username);
 | 
			
		||||
        MqttClient mqttClient = MqttClient.create(clientConfig, listener);
 | 
			
		||||
        MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
 | 
			
		||||
        mqttClient.connect("localhost", 1883).get();
 | 
			
		||||
        return mqttClient;
 | 
			
		||||
    }
 | 
			
		||||
@ -479,9 +499,10 @@ public class MqttClientTest extends AbstractContainerTest {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public void onMessage(String topic, ByteBuf message) {
 | 
			
		||||
        public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
 | 
			
		||||
            log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
 | 
			
		||||
            events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
 | 
			
		||||
            return Futures.immediateVoidFuture();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.msa.connectivity;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
@ -32,6 +33,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
 | 
			
		||||
import org.testng.annotations.AfterMethod;
 | 
			
		||||
import org.testng.annotations.BeforeMethod;
 | 
			
		||||
import org.testng.annotations.Test;
 | 
			
		||||
import org.thingsboard.common.util.AbstractListeningExecutor;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClient;
 | 
			
		||||
@ -76,8 +78,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
 | 
			
		||||
    private MqttMessageListener listener;
 | 
			
		||||
    private JsonParser jsonParser = new JsonParser();
 | 
			
		||||
 | 
			
		||||
    AbstractListeningExecutor handlerExecutor;
 | 
			
		||||
 | 
			
		||||
    @BeforeMethod
 | 
			
		||||
    public void createGateway() throws Exception {
 | 
			
		||||
        this.handlerExecutor = new AbstractListeningExecutor() {
 | 
			
		||||
            @Override
 | 
			
		||||
            protected int getThreadPollSize() {
 | 
			
		||||
                return 4;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        handlerExecutor.init();
 | 
			
		||||
 | 
			
		||||
        testRestClient.login("tenant@thingsboard.org", "tenant");
 | 
			
		||||
        gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype());
 | 
			
		||||
        DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId());
 | 
			
		||||
@ -94,6 +106,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
 | 
			
		||||
        this.listener = null;
 | 
			
		||||
        this.mqttClient = null;
 | 
			
		||||
        this.createdDevice = null;
 | 
			
		||||
        if (handlerExecutor != null) {
 | 
			
		||||
            handlerExecutor.destroy();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -403,11 +418,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
 | 
			
		||||
        return testRestClient.getDeviceById(createdDeviceId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getOwnerId() {
 | 
			
		||||
        return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
 | 
			
		||||
        MqttClientConfig clientConfig = new MqttClientConfig();
 | 
			
		||||
        clientConfig.setOwnerId(getOwnerId());
 | 
			
		||||
        clientConfig.setClientId("MQTT client from test");
 | 
			
		||||
        clientConfig.setUsername(deviceCredentials.getCredentialsId());
 | 
			
		||||
        MqttClient mqttClient = MqttClient.create(clientConfig, listener);
 | 
			
		||||
        MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
 | 
			
		||||
        mqttClient.connect("localhost", 1883).get();
 | 
			
		||||
        return mqttClient;
 | 
			
		||||
    }
 | 
			
		||||
@ -421,9 +441,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public void onMessage(String topic, ByteBuf message) {
 | 
			
		||||
        public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
 | 
			
		||||
            log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
 | 
			
		||||
            events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
 | 
			
		||||
            return Futures.immediateVoidFuture();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
queue:
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
 | 
			
		||||
 | 
			
		||||
@ -35,6 +35,10 @@
 | 
			
		||||
    </properties>
 | 
			
		||||
 | 
			
		||||
    <dependencies>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>org.thingsboard.common</groupId>
 | 
			
		||||
            <artifactId>util</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>io.netty</groupId>
 | 
			
		||||
            <artifactId>netty-codec-mqtt</artifactId>
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,10 @@
 | 
			
		||||
package org.thingsboard.mqtt;
 | 
			
		||||
 | 
			
		||||
import com.google.common.collect.ImmutableSet;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import io.netty.channel.Channel;
 | 
			
		||||
import io.netty.channel.ChannelHandlerContext;
 | 
			
		||||
import io.netty.channel.SimpleChannelInboundHandler;
 | 
			
		||||
@ -34,9 +38,13 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
 | 
			
		||||
import io.netty.util.CharsetUtil;
 | 
			
		||||
import io.netty.util.ReferenceCountUtil;
 | 
			
		||||
import io.netty.util.concurrent.Promise;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
 | 
			
		||||
 | 
			
		||||
@ -117,27 +125,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
 | 
			
		||||
        super.channelInactive(ctx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
 | 
			
		||||
        boolean handlerInvoked = false;
 | 
			
		||||
        for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
 | 
			
		||||
            if (subscription.matches(message.variableHeader().topicName())) {
 | 
			
		||||
                if (subscription.isOnce() && subscription.isCalled()) {
 | 
			
		||||
                    continue;
 | 
			
		||||
    ListenableFuture<Void> invokeHandlersForIncomingPublish(MqttPublishMessage message) {
 | 
			
		||||
        var future = Futures.immediateVoidFuture();
 | 
			
		||||
        var handlerInvoked = new AtomicBoolean();
 | 
			
		||||
        try {
 | 
			
		||||
            for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
 | 
			
		||||
                if (subscription.matches(message.variableHeader().topicName())) {
 | 
			
		||||
                    future = Futures.transform(future, x -> {
 | 
			
		||||
                        if (subscription.isOnce() && subscription.isCalled()) {
 | 
			
		||||
                            return null;
 | 
			
		||||
                        }
 | 
			
		||||
                        message.payload().markReaderIndex();
 | 
			
		||||
                        subscription.setCalled(true);
 | 
			
		||||
                        subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
 | 
			
		||||
                        if (subscription.isOnce()) {
 | 
			
		||||
                            this.client.off(subscription.getTopic(), subscription.getHandler());
 | 
			
		||||
                        }
 | 
			
		||||
                        message.payload().resetReaderIndex();
 | 
			
		||||
                        handlerInvoked.set(true);
 | 
			
		||||
                        return null;
 | 
			
		||||
                    }, client.getHandlerExecutor());
 | 
			
		||||
                }
 | 
			
		||||
                message.payload().markReaderIndex();
 | 
			
		||||
                subscription.setCalled(true);
 | 
			
		||||
                subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
 | 
			
		||||
                if (subscription.isOnce()) {
 | 
			
		||||
                    this.client.off(subscription.getTopic(), subscription.getHandler());
 | 
			
		||||
                }
 | 
			
		||||
                message.payload().resetReaderIndex();
 | 
			
		||||
                handlerInvoked = true;
 | 
			
		||||
            }
 | 
			
		||||
            future = Futures.transform(future, x -> {
 | 
			
		||||
                if (!handlerInvoked.get() && client.getDefaultHandler() != null) {
 | 
			
		||||
                    client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
 | 
			
		||||
                }
 | 
			
		||||
                return null;
 | 
			
		||||
            }, client.getHandlerExecutor());
 | 
			
		||||
        } finally {
 | 
			
		||||
            Futures.addCallback(future, new FutureCallback<>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(Void result) {
 | 
			
		||||
                    message.payload().release();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    message.payload().release();
 | 
			
		||||
                }
 | 
			
		||||
            }, MoreExecutors.directExecutor());
 | 
			
		||||
        }
 | 
			
		||||
        if (!handlerInvoked && client.getDefaultHandler() != null) {
 | 
			
		||||
            client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
 | 
			
		||||
        }
 | 
			
		||||
        message.payload().release();
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handleConack(Channel channel, MqttConnAckMessage message) {
 | 
			
		||||
@ -204,11 +233,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
 | 
			
		||||
                break;
 | 
			
		||||
 | 
			
		||||
            case AT_LEAST_ONCE:
 | 
			
		||||
                invokeHandlersForIncomingPublish(message);
 | 
			
		||||
                var future = invokeHandlersForIncomingPublish(message);
 | 
			
		||||
                if (message.variableHeader().packetId() != -1) {
 | 
			
		||||
                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
 | 
			
		||||
                    MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
 | 
			
		||||
                    channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
 | 
			
		||||
                    future.addListener(() -> {
 | 
			
		||||
                        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
 | 
			
		||||
                        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
 | 
			
		||||
                        channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
 | 
			
		||||
                    }, MoreExecutors.directExecutor());
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
 | 
			
		||||
@ -263,14 +294,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handlePubrel(Channel channel, MqttMessage message) {
 | 
			
		||||
        var future = Futures.immediateVoidFuture();
 | 
			
		||||
        if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
 | 
			
		||||
            MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
 | 
			
		||||
            this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
 | 
			
		||||
            this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
 | 
			
		||||
            future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
 | 
			
		||||
            future = Futures.transform(future, x -> {
 | 
			
		||||
                this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
 | 
			
		||||
                return null;
 | 
			
		||||
                }, MoreExecutors.directExecutor());
 | 
			
		||||
        }
 | 
			
		||||
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
 | 
			
		||||
        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
 | 
			
		||||
        channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
 | 
			
		||||
        future.addListener(() -> {
 | 
			
		||||
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
 | 
			
		||||
            MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
 | 
			
		||||
            channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handlePubcomp(MqttMessage message) {
 | 
			
		||||
@ -281,4 +318,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
 | 
			
		||||
        pendingPublish.getPayload().release();
 | 
			
		||||
        pendingPublish.onPubcompReceived();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 | 
			
		||||
        try {
 | 
			
		||||
            if (cause instanceof IOException) {
 | 
			
		||||
                if (log.isDebugEnabled()) {
 | 
			
		||||
                    log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage());
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause);
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            ReferenceCountUtil.release(cause);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
 | 
			
		||||
import io.netty.channel.nio.NioEventLoopGroup;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttQoS;
 | 
			
		||||
import io.netty.util.concurrent.Future;
 | 
			
		||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
			
		||||
 | 
			
		||||
public interface MqttClient {
 | 
			
		||||
 | 
			
		||||
@ -71,6 +72,8 @@ public interface MqttClient {
 | 
			
		||||
     */
 | 
			
		||||
    void setEventLoop(EventLoopGroup eventLoop);
 | 
			
		||||
 | 
			
		||||
    ListeningExecutor getHandlerExecutor();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
 | 
			
		||||
     *
 | 
			
		||||
@ -180,8 +183,8 @@ public interface MqttClient {
 | 
			
		||||
     * @param config The config object to use while looking for settings
 | 
			
		||||
     * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions
 | 
			
		||||
     */
 | 
			
		||||
    static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){
 | 
			
		||||
        return new MqttClientImpl(config, defaultHandler);
 | 
			
		||||
    static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){
 | 
			
		||||
        return new MqttClientImpl(config, defaultHandler, handlerExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,8 @@ import io.netty.channel.Channel;
 | 
			
		||||
import io.netty.channel.socket.nio.NioSocketChannel;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttVersion;
 | 
			
		||||
import io.netty.handler.ssl.SslContext;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nonnull;
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
@ -26,10 +28,12 @@ import java.util.Random;
 | 
			
		||||
 | 
			
		||||
@SuppressWarnings({"WeakerAccess", "unused"})
 | 
			
		||||
public final class MqttClientConfig {
 | 
			
		||||
 | 
			
		||||
    private final SslContext sslContext;
 | 
			
		||||
    private final String randomClientId;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    @Setter
 | 
			
		||||
    private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes
 | 
			
		||||
    private String clientId;
 | 
			
		||||
    private int timeoutSeconds = 60;
 | 
			
		||||
    private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;
 | 
			
		||||
 | 
			
		||||
@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise;
 | 
			
		||||
import io.netty.util.concurrent.Future;
 | 
			
		||||
import io.netty.util.concurrent.Promise;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
    private int port;
 | 
			
		||||
    private MqttClientCallback callback;
 | 
			
		||||
 | 
			
		||||
    private final ListeningExecutor handlerExecutor;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Construct the MqttClientImpl with default config
 | 
			
		||||
     */
 | 
			
		||||
    public MqttClientImpl(MqttHandler defaultHandler) {
 | 
			
		||||
        this.clientConfig = new MqttClientConfig();
 | 
			
		||||
        this.defaultHandler = defaultHandler;
 | 
			
		||||
    public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
 | 
			
		||||
        this(new MqttClientConfig(), defaultHandler, handlerExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
     *
 | 
			
		||||
     * @param clientConfig The config object to use while looking for settings
 | 
			
		||||
     */
 | 
			
		||||
    public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) {
 | 
			
		||||
    public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
 | 
			
		||||
        this.clientConfig = clientConfig;
 | 
			
		||||
        this.defaultHandler = defaultHandler;
 | 
			
		||||
        this.handlerExecutor = handlerExecutor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
        this.eventLoop = eventLoop;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListeningExecutor getHandlerExecutor() {
 | 
			
		||||
        return this.handlerExecutor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
 | 
			
		||||
     *
 | 
			
		||||
 | 
			
		||||
@ -15,9 +15,10 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.mqtt;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import io.netty.buffer.ByteBuf;
 | 
			
		||||
 | 
			
		||||
public interface MqttHandler {
 | 
			
		||||
 | 
			
		||||
    void onMessage(String topic, ByteBuf payload);
 | 
			
		||||
    ListenableFuture<Void> onMessage(String topic, ByteBuf payload);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ final class MqttSubscription {
 | 
			
		||||
 | 
			
		||||
    private final boolean once;
 | 
			
		||||
 | 
			
		||||
    private boolean called;
 | 
			
		||||
    private volatile boolean called;
 | 
			
		||||
 | 
			
		||||
    MqttSubscription(String topic, MqttHandler handler, boolean once) {
 | 
			
		||||
        if (topic == null) {
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ import org.junit.After;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.thingsboard.common.util.AbstractListeningExecutor;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClient;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClientConfig;
 | 
			
		||||
import org.thingsboard.mqtt.MqttConnectResult;
 | 
			
		||||
@ -49,8 +50,18 @@ public class MqttIntegrationTest {
 | 
			
		||||
 | 
			
		||||
    MqttClient mqttClient;
 | 
			
		||||
 | 
			
		||||
    AbstractListeningExecutor handlerExecutor;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void init() throws Exception {
 | 
			
		||||
        this.handlerExecutor = new AbstractListeningExecutor() {
 | 
			
		||||
            @Override
 | 
			
		||||
            protected int getThreadPollSize() {
 | 
			
		||||
                return 4;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        handlerExecutor.init();
 | 
			
		||||
 | 
			
		||||
        this.eventLoopGroup = new NioEventLoopGroup();
 | 
			
		||||
 | 
			
		||||
        this.mqttServer = new MqttServer();
 | 
			
		||||
@ -68,6 +79,9 @@ public class MqttIntegrationTest {
 | 
			
		||||
        if (this.eventLoopGroup != null) {
 | 
			
		||||
            this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
 | 
			
		||||
        }
 | 
			
		||||
        if (this.handlerExecutor != null) {
 | 
			
		||||
            this.handlerExecutor.destroy();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -108,9 +122,10 @@ public class MqttIntegrationTest {
 | 
			
		||||
 | 
			
		||||
    private MqttClient initClient() throws Exception {
 | 
			
		||||
        MqttClientConfig config = new MqttClientConfig();
 | 
			
		||||
        config.setOwnerId("MqttIntegrationTest");
 | 
			
		||||
        config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
 | 
			
		||||
        config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
 | 
			
		||||
        MqttClient client = MqttClient.create(config, null);
 | 
			
		||||
        MqttClient client = MqttClient.create(config, null, handlerExecutor);
 | 
			
		||||
        client.setEventLoop(this.eventLoopGroup);
 | 
			
		||||
        Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -105,8 +105,13 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    String getOwnerId(TbContext ctx) {
 | 
			
		||||
        return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected MqttClient initClient(TbContext ctx) throws Exception {
 | 
			
		||||
        MqttClientConfig config = new MqttClientConfig(getSslContext());
 | 
			
		||||
        config.setOwnerId(getOwnerId(ctx));
 | 
			
		||||
        if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
 | 
			
		||||
            config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
 | 
			
		||||
                    this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());
 | 
			
		||||
@ -114,7 +119,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
        config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
 | 
			
		||||
 | 
			
		||||
        prepareMqttClientConfig(config);
 | 
			
		||||
        MqttClient client = MqttClient.create(config, null);
 | 
			
		||||
        MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
 | 
			
		||||
        client.setEventLoop(ctx.getSharedEventLoop());
 | 
			
		||||
        Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
 | 
			
		||||
        MqttConnectResult result;
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,7 @@ public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(Throwable t) {
 | 
			
		||||
        callback.onFailure(new RuleEngineException(t.getMessage()));
 | 
			
		||||
        callback.onFailure(new RuleEngineException(t.getMessage(), t));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -68,7 +68,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,7 @@ import {
 | 
			
		||||
  EventContentDialogData
 | 
			
		||||
} from '@home/components/event/event-content-dialog.component';
 | 
			
		||||
import { isEqual, sortObjectKeys } from '@core/utils';
 | 
			
		||||
import { historyInterval, MINUTE } from '@shared/models/time/time.models';
 | 
			
		||||
import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay';
 | 
			
		||||
import { ChangeDetectorRef, EventEmitter, Injector, StaticProvider, ViewContainerRef } from '@angular/core';
 | 
			
		||||
import { ComponentPortal } from '@angular/cdk/portal';
 | 
			
		||||
@ -92,6 +93,7 @@ export class EventTableConfig extends EntityTableConfig<Event, TimePageLink> {
 | 
			
		||||
    this.loadDataOnInit = false;
 | 
			
		||||
    this.tableTitle = '';
 | 
			
		||||
    this.useTimePageLink = true;
 | 
			
		||||
    this.defaultTimewindowInterval = historyInterval(MINUTE * 15);
 | 
			
		||||
    this.detailsPanelEnabled = false;
 | 
			
		||||
    this.selectionEnabled = false;
 | 
			
		||||
    this.searchEnabled = false;
 | 
			
		||||
@ -179,7 +181,7 @@ export class EventTableConfig extends EntityTableConfig<Event, TimePageLink> {
 | 
			
		||||
  updateColumns(updateTableColumns: boolean = false): void {
 | 
			
		||||
    this.columns = [];
 | 
			
		||||
    this.columns.push(
 | 
			
		||||
      new DateEntityTableColumn<Event>('createdTime', 'event.event-time', this.datePipe, '120px'),
 | 
			
		||||
      new DateEntityTableColumn<Event>('createdTime', 'event.event-time', this.datePipe, '120px', 'yyyy-MM-dd HH:mm:ss.SSS'),
 | 
			
		||||
      new EntityTableColumn<Event>('server', 'event.server', '100px',
 | 
			
		||||
        (entity) => entity.body.server, entity => ({}), false));
 | 
			
		||||
    switch (this.eventType) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user