Merge branch 'master' into feature/update-tenant-profile-isolation

This commit is contained in:
ViacheslavKlimov 2023-08-11 13:39:03 +03:00
commit ae76f4ebbb
43 changed files with 933 additions and 72 deletions

View File

@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));
msg.getCallback().onFailure(new RuleEngineException(e.getMessage(), e));
}
}
@ -335,7 +335,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
msg.getCallback().onFailure(rne);
} catch (Exception e) {
log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage(), e));
}
}

View File

@ -363,7 +363,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
callback.onSuccess();
}
} catch (Exception e) {
callback.onFailure(new RuleEngineException(e.getMessage()));
callback.onFailure(new RuleEngineException(e.getMessage(), e));
}
}

View File

@ -17,11 +17,14 @@ package org.thingsboard.server.service.queue;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ExceptionUtil;
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -57,8 +60,23 @@ public class TbMsgPackCallback implements TbMsgCallback {
ctx.onSuccess(id);
}
@Override
public void onRateLimit(RuleEngineException e) {
log.debug("[{}] ON RATE LIMIT", id, e);
//TODO notify tenant on rate limit
if (failedMsgTimer != null) {
failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
}
ctx.onSuccess(id);
}
@Override
public void onFailure(RuleEngineException e) {
if (ExceptionUtil.lookupExceptionInCause(e, AbstractRateLimitException.class) != null) {
onRateLimit(e);
return;
}
log.trace("[{}] ON FAILURE", id, e);
if (failedMsgTimer != null) {
failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);

View File

@ -96,6 +96,10 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
# The recalculate_delay property recommended in a microservices architecture setup for rule-engine services.
# This property provides a pause to ensure that when a rule-engine service is restarted, other nodes don't immediately attempt to recalculate their partitions.
# The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability.
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cluster:
stats:
@ -245,7 +249,8 @@ cassandra:
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" # Buffered rate executor (read, write), for managing I/O rate. See "nosql-*-callback" threads in JMX
result_processing_threads: "${CASSANDRA_QUERY_RESULT_PROCESSING_THREADS:50}" # Result set transformer and processing. See "cassandra-callback" threads in JMX
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
# set all data types values except target to null for the same ts on save

View File

@ -0,0 +1,98 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
class TbMsgPackCallbackTest {
TenantId tenantId;
UUID msgId;
TbMsgPackProcessingContext ctx;
TbMsgPackCallback callback;
@BeforeEach
void setUp() {
tenantId = TenantId.fromUUID(UUID.randomUUID());
msgId = UUID.randomUUID();
ctx = mock(TbMsgPackProcessingContext.class);
callback = spy(new TbMsgPackCallback(msgId, tenantId, ctx));
}
private static Stream<Arguments> testOnFailure_NotRateLimitException() {
return Stream.of(
Arguments.of(new RuleEngineException("rule engine no cause")),
Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())),
Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))),
Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))),
Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode()))
);
}
@ParameterizedTest
@MethodSource
void testOnFailure_NotRateLimitException(RuleEngineException ree) {
callback.onFailure(ree);
verify(callback, never()).onRateLimit(any());
verify(callback, never()).onSuccess();
verify(ctx, never()).onSuccess(any());
}
private static Stream<Arguments> testOnFailure_RateLimitException() {
return Stream.of(
Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))),
Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))),
Arguments.of(
new RuleEngineException("caused lvl 3",
new RuntimeException(
new Exception(
new TbRateLimitsException(EntityType.ASSET)))))
);
}
@ParameterizedTest
@MethodSource
void testOnFailure_RateLimitException(RuleEngineException ree) {
callback.onFailure(ree);
verify(callback).onRateLimit(any());
verify(callback).onFailure(any());
verify(callback, never()).onSuccess();
verify(ctx).onSuccess(msgId);
verify(ctx).onSuccess(any());
verify(ctx, never()).onFailure(any(), any(), any());
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.exception;
public abstract class AbstractRateLimitException extends RuntimeException {
public AbstractRateLimitException() {
super();
}
public AbstractRateLimitException(String message) {
super(message);
}
public AbstractRateLimitException(String message, Throwable cause) {
super(message, cause);
}
public AbstractRateLimitException(Throwable cause) {
super(cause);
}
protected AbstractRateLimitException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.common.data.exception;
public class ApiUsageLimitsExceededException extends RuntimeException {
public class ApiUsageLimitsExceededException extends AbstractRateLimitException {
public ApiUsageLimitsExceededException(String message) {
super(message);
}

View File

@ -32,6 +32,11 @@ public class RuleEngineException extends Exception {
ts = System.currentTimeMillis();
}
public RuleEngineException(String message, Throwable t) {
super(message != null ? message : "Unknown", t);
ts = System.currentTimeMillis();
}
public String toJsonString() {
try {
return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage()));

View File

@ -39,6 +39,10 @@ public interface TbMsgCallback {
void onFailure(RuleEngineException e);
default void onRateLimit(RuleEngineException e) {
onFailure(e);
};
/**
* Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise.
* message may no longer be valid, if the message pack is already expired/canceled/failed.

View File

@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools;
import lombok.Getter;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
/**
* Created by ashvayka on 22.10.18.
*/
public class TbRateLimitsException extends RuntimeException {
public class TbRateLimitsException extends AbstractRateLimitException {
@Getter
private final EntityType entityType;

View File

@ -40,6 +40,6 @@ public class MultipleTbQueueCallbackWrapper implements TbQueueCallback {
@Override
public void onFailure(Throwable t) {
callback.onFailure(new RuleEngineException(t.getMessage()));
callback.onFailure(new RuleEngineException(t.getMessage(), t));
}
}

View File

@ -41,6 +41,6 @@ public class MultipleTbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override
public void onFailure(Throwable t) {
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
}
}

View File

@ -35,6 +35,6 @@ public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override
public void onFailure(Throwable t) {
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
}
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.queue.discovery;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
@ -44,8 +45,10 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -66,6 +69,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private Integer zkSessionTimeout;
@Value("${zk.zk_dir}")
private String zkDir;
@Value("${zk.recalculate_delay:0}")
private Long recalculateDelay;
protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
@ -82,6 +89,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
PartitionService partitionService) {
this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService;
delayedTasks = new ConcurrentHashMap<>();
}
@PostConstruct
@ -287,11 +295,39 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.error("Failed to decode server instance for node {}", data.getPath(), e);
throw e;
}
log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId());
String serviceId = instance.getServiceId();
ProtocolStringList serviceTypesList = instance.getServiceTypesList();
log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId);
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
ScheduledFuture<?> task = delayedTasks.remove(serviceId);
if (task != null) {
if (task.cancel(false)) {
log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].",
serviceId, serviceTypesList);
} else {
log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!",
serviceId, serviceTypesList);
recalculatePartitions();
}
} else {
log.trace("[{}] Going to recalculate partitions due to adding new node [{}].",
serviceId, serviceTypesList);
recalculatePartitions();
}
break;
case CHILD_REMOVED:
recalculatePartitions();
ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
log.debug("[{}] Going to recalculate partitions due to removed node [{}]",
serviceId, serviceTypesList);
ScheduledFuture<?> removedTask = delayedTasks.remove(serviceId);
if (removedTask != null) {
recalculatePartitions();
}
}, recalculateDelay, TimeUnit.MILLISECONDS);
delayedTasks.put(serviceId, future);
break;
default:
break;
@ -303,6 +339,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
* Synchronized to ensure that other servers info is up to date
* */
synchronized void recalculatePartitions() {
delayedTasks.values().forEach(future -> future.cancel(false));
delayedTasks.clear();
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers());
}

View File

@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
private final Set<TopicPartitionInfo> topics;
@Getter
private final String clientId;
@Builder
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
Properties props = settings.toProducerProps();
this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null");
if (!StringUtils.isEmpty(clientId)) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
}
@ -72,6 +79,22 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
public void init() {
}
void addAnalyticHeaders(List<Header> headers) {
headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));
if (log.isTraceEnabled()) {
try {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
int maxlevel = Math.min(stackTrace.length, 10);
for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders
headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8)));
}
} catch (Throwable t) {
log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t);
}
}
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
try {
@ -79,7 +102,10 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
String key = msg.getKey().toString();
byte[] data = msg.getData();
ProducerRecord<String, byte[]> record;
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
if (log.isDebugEnabled()) {
addAnalyticHeaders(headers);
}
record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {

View File

@ -0,0 +1,189 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.discovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ZkDiscoveryServiceTest {
@Mock
private TbServiceInfoProvider serviceInfoProvider;
@Mock
private PartitionService partitionService;
@Mock
private CuratorFramework client;
@Mock
private PathChildrenCache cache;
@Mock
private CuratorFramework curatorFramework;
private ZkDiscoveryService zkDiscoveryService;
private static final long RECALCULATE_DELAY = 100L;
final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build();
final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray());
final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build();
final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray());
@Before
public void setup() {
zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService));
ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
when(client.getState()).thenReturn(CuratorFrameworkState.STARTED);
ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false);
ReflectionTestUtils.setField(zkDiscoveryService, "client", client);
ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache);
ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010");
ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService);
ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY);
ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard");
when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo);
List<ChildData> dataList = new ArrayList<>();
dataList.add(currentData);
when(cache.getCurrentData()).thenReturn(dataList);
}
@Test
public void restartNodeInTimeTest() throws Exception {
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size());
verify(partitionService, never()).recalculatePartitions(any(), any());
startNode(childData);
verify(partitionService, never()).recalculatePartitions(any(), any());
Thread.sleep(RECALCULATE_DELAY * 2);
verify(partitionService, never()).recalculatePartitions(any(), any());
assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
}
@Test
public void restartNodeNotInTimeTest() throws Exception {
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size());
Thread.sleep(RECALCULATE_DELAY * 2);
assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(Collections.emptyList()));
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
}
@Test
public void startAnotherNodeDuringRestartTest() throws Exception {
var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build();
var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray());
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size());
startNode(anotherData);
assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo)));
reset(partitionService);
Thread.sleep(RECALCULATE_DELAY * 2);
verify(partitionService, never()).recalculatePartitions(any(), any());
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo)));
}
private void startNode(ChildData data) throws Exception {
cache.getCurrentData().add(data);
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data));
}
private void stopNode(ChildData data) throws Exception {
cache.getCurrentData().remove(data);
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data));
}
}

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.queue.TbQueueMsg;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
@Slf4j
class TbKafkaProducerTemplateTest {
TbKafkaProducerTemplate<TbQueueMsg> producerTemplate;
@BeforeEach
void setUp() {
producerTemplate = mock(TbKafkaProducerTemplate.class);
willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any());
willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId();
}
@Test
void testAddAnalyticHeaders() {
List<Header> headers = new ArrayList<>();
producerTemplate.addAnalyticHeaders(headers);
assertThat(headers).isNotEmpty();
headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8)));
}
}

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- TbKafkaProducerTemplate will add headers for each message when log level:
- DEBUG - producerId and thread name
- TRACE - will add stacktrace.
Kafka compression is highly recommended -->
<logger name="org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate" level="TRACE"/>
<root level="INFO">
<appender-ref ref="console"/>
</root>
</configuration>

View File

@ -52,7 +52,7 @@ import java.util.stream.Collectors;
public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
@Autowired
private TbLwM2MModelConfigStore modelStore;
TbLwM2MModelConfigStore modelStore;
@Autowired
@Lazy
@ -67,14 +67,14 @@ public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
@Autowired
private LwM2MTelemetryLogService logService;
private ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs;
ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs;
@AfterStartUp(order = AfterStartUp.BEFORE_TRANSPORT_SERVICE)
private void init() {
public void init() {
List<LwM2MModelConfig> models = modelStore.getAll();
log.debug("Fetched model configs: {}", models);
currentModelConfigs = models.stream()
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m));
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m, (existing, replacement) -> existing));
}
@Override

View File

@ -0,0 +1,73 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.model;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
class LwM2MModelConfigServiceImplTest {
LwM2MModelConfigServiceImpl service;
TbLwM2MModelConfigStore modelStore;
@BeforeEach
void setUp() {
service = new LwM2MModelConfigServiceImpl();
modelStore = mock(TbLwM2MModelConfigStore.class);
service.modelStore = modelStore;
}
@Test
void testInitWithDuplicatedModels() {
LwM2MModelConfig config = new LwM2MModelConfig("urn:imei:951358811362976");
List<LwM2MModelConfig> models = List.of(config, config);
willReturn(models).given(modelStore).getAll();
service.init();
assertThat(service.currentModelConfigs).containsExactlyEntriesOf(Map.of(config.getEndpoint(), config));
}
@Test
void testInitWithNonUniqueEndpoints() {
LwM2MModelConfig configAlfa = new LwM2MModelConfig("urn:imei:951358811362976");
LwM2MModelConfig configBravo = new LwM2MModelConfig("urn:imei:151358811362976");
LwM2MModelConfig configDelta = new LwM2MModelConfig("urn:imei:151358811362976");
assertThat(configBravo.getEndpoint()).as("non-unique endpoints provided").isEqualTo(configDelta.getEndpoint());
List<LwM2MModelConfig> models = List.of(configAlfa, configBravo, configDelta);
willReturn(models).given(modelStore).getAll();
service.init();
assertThat(service.currentModelConfigs).containsExactlyInAnyOrderEntriesOf(Map.of(
configAlfa.getEndpoint(), configAlfa,
configBravo.getEndpoint(), configBravo
));
}
@Test
void testInitWithEmptyModels() {
willReturn(Collections.emptyList()).given(modelStore).getAll();
service.init();
assertThat(service.currentModelConfigs).isEmpty();
}
}

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.google.gson.JsonParseException;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import javax.script.ScriptException;
import java.io.PrintWriter;
import java.io.StringWriter;
@Slf4j
public class ExceptionUtil {
@SuppressWarnings("unchecked")
public static <T extends Exception> T lookupException(Throwable source, Class<T> clazz) {
Exception e = lookupExceptionInCause(source, clazz);
if (e != null) {
return (T) e;
} else {
return null;
}
}
public static Exception lookupExceptionInCause(Throwable source, Class<? extends Exception>... clazzes) {
while (source != null) {
for (Class<? extends Exception> clazz : clazzes) {
if (clazz.isAssignableFrom(source.getClass())) {
return (Exception) source;
}
}
source = source.getCause();
}
return null;
}
public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) {
Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class);
if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) {
return exception.getMessage();
} else {
if (stackTraceEnabled) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
} else {
log.debug("[{}] Unknown error during message processing", componentId, e);
return "Please contact system administrator";
}
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
class ExceptionUtilTest {
final Exception cause = new RuntimeException();
@Test
void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() {
Exception e = cause;
for (int i = 0; i <= 16384; i++) {
e = new Exception(e);
}
assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause);
}
@Test
void givenCause_whenLookupExceptionInCause_thenReturnCause() {
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause);
}
@Test
void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() {
assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause);
}
@Test
void givenNoCause_whenLookupExceptionInCause_thenReturnNull() {
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull();
}
@Test
void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() {
final Exception cause = new IOException();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull();
}
@Test
void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() {
final Exception causeIAE = new IllegalAccessException();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE);
final Exception causeIOE = new IOException(causeIAE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE);
}
}

View File

@ -19,13 +19,13 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by ashvayka on 21.02.17.
@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
protected ExecutorService readResultsProcessingExecutor;
@Value("${cassandra.query.result_processing_threads:50}")
private int threadPoolSize;
@PostConstruct
public void startExecutor() {
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback"));
readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback");
}
@PreDestroy

View File

@ -124,7 +124,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (tenantProfileConfiguration != null &&
StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
if (task.getTenantId() == null) {
log.info("Invalid task received: {}", task);
log.info("[{}] Invalid task received: {}", getBufferName(), task);
} else if (!task.getTenantId().isNullUid()) {
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())
@ -173,7 +173,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
public abstract String getBufferName();
private void dispatch() {
log.info("Buffered rate executor thread started");
log.info("[{}] Buffered rate executor thread started", getBufferName());
while (!Thread.interrupted()) {
int curLvl = concurrencyLevel.get();
AsyncTaskContext<T, V> taskCtx = null;
@ -185,7 +185,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
printQueriesIdx.set(0);
String query = queryToString(finalTaskCtx);
log.info("[{}] Cassandra query: {}", taskCtx.getId(), query);
log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query);
}
}
logTask("Processing", finalTaskCtx);
@ -238,7 +238,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
}
}
}
log.info("Buffered rate executor thread stopped");
log.info("[{}] Buffered rate executor thread stopped", getBufferName());
}
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
@ -314,7 +314,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
stats.getStatsCounters().forEach(StatsCounter::clear);
log.info("Permits {}", statsBuilder);
log.info("[{}] Permits {}", getBufferName(), statsBuilder);
}
stats.getRateLimitedTenants().entrySet().stream()
@ -330,13 +330,13 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
try {
return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName);
} catch (Exception e) {
log.error("[{}] Failed to get tenant name", tenantId, e);
log.error("[{}][{}] Failed to get tenant name", getBufferName(), tenantId, e);
return defaultName;
}
});
log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
log.info("[{}][{}][{}] Rate limited requests: {}", getBufferName(), tenantId, name, rateLimitedRequests);
} else {
log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
log.info("[{}][{}] Rate limited requests: {}", getBufferName(), tenantId, rateLimitedRequests);
}
});
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -28,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
@ -74,8 +76,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi
public class MqttClientTest extends AbstractContainerTest {
private Device device;
AbstractListeningExecutor handlerExecutor;
@BeforeMethod
public void setUp() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
testRestClient.login("tenant@thingsboard.org", "tenant");
device = testRestClient.postDevice("", defaultDevicePrototype("http_"));
}
@ -83,6 +95,9 @@ public class MqttClientTest extends AbstractContainerTest {
@AfterMethod
public void tearDown() {
testRestClient.deleteDeviceIfExists(device.getId());
if (handlerExecutor != null) {
handlerExecutor.destroy();
}
}
@Test
public void telemetryUpload() throws Exception {
@ -461,11 +476,16 @@ public class MqttClientTest extends AbstractContainerTest {
return getMqttClient(deviceCredentials.getCredentialsId(), listener);
}
private String getOwnerId() {
return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]";
}
private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(username);
MqttClient mqttClient = MqttClient.create(clientConfig, listener);
MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
mqttClient.connect("localhost", 1883).get();
return mqttClient;
}
@ -479,9 +499,10 @@ public class MqttClientTest extends AbstractContainerTest {
}
@Override
public void onMessage(String topic, ByteBuf message) {
public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
}
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -32,6 +33,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.mqtt.MqttClient;
@ -76,8 +78,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
private MqttMessageListener listener;
private JsonParser jsonParser = new JsonParser();
AbstractListeningExecutor handlerExecutor;
@BeforeMethod
public void createGateway() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
testRestClient.login("tenant@thingsboard.org", "tenant");
gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype());
DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId());
@ -94,6 +106,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
this.listener = null;
this.mqttClient = null;
this.createdDevice = null;
if (handlerExecutor != null) {
handlerExecutor.destroy();
}
}
@Test
@ -403,11 +418,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
return testRestClient.getDeviceById(createdDeviceId);
}
private String getOwnerId() {
return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]";
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, listener);
MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
mqttClient.connect("localhost", 1883).get();
return mqttClient;
}
@ -421,9 +441,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
}
@Override
public void onMessage(String topic, ByteBuf message) {
public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
}
}

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
queue:
type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)

View File

@ -35,6 +35,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>

View File

@ -16,6 +16,10 @@
package org.thingsboard.mqtt;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
private final MqttClientImpl client;
@ -110,27 +121,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
super.channelInactive(ctx);
}
private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
boolean handlerInvoked = false;
for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
if (subscription.matches(message.variableHeader().topicName())) {
if (subscription.isOnce() && subscription.isCalled()) {
continue;
ListenableFuture<Void> invokeHandlersForIncomingPublish(MqttPublishMessage message) {
var future = Futures.immediateVoidFuture();
var handlerInvoked = new AtomicBoolean();
try {
for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
if (subscription.matches(message.variableHeader().topicName())) {
future = Futures.transform(future, x -> {
if (subscription.isOnce() && subscription.isCalled()) {
return null;
}
message.payload().markReaderIndex();
subscription.setCalled(true);
subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
if (subscription.isOnce()) {
this.client.off(subscription.getTopic(), subscription.getHandler());
}
message.payload().resetReaderIndex();
handlerInvoked.set(true);
return null;
}, client.getHandlerExecutor());
}
message.payload().markReaderIndex();
subscription.setCalled(true);
subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
if (subscription.isOnce()) {
this.client.off(subscription.getTopic(), subscription.getHandler());
}
message.payload().resetReaderIndex();
handlerInvoked = true;
}
future = Futures.transform(future, x -> {
if (!handlerInvoked.get() && client.getDefaultHandler() != null) {
client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
}
return null;
}, client.getHandlerExecutor());
} finally {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void result) {
message.payload().release();
}
@Override
public void onFailure(Throwable t) {
message.payload().release();
}
}, MoreExecutors.directExecutor());
}
if (!handlerInvoked && client.getDefaultHandler() != null) {
client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
}
message.payload().release();
return future;
}
private void handleConack(Channel channel, MqttConnAckMessage message) {
@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
break;
case AT_LEAST_ONCE:
invokeHandlersForIncomingPublish(message);
var future = invokeHandlersForIncomingPublish(message);
if (message.variableHeader().packetId() != -1) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
future.addListener(() -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
}, MoreExecutors.directExecutor());
}
break;
@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
private void handlePubrel(Channel channel, MqttMessage message) {
var future = Futures.immediateVoidFuture();
if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
future = Futures.transform(future, x -> {
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
return null;
}, MoreExecutors.directExecutor());
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
future.addListener(() -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
}, MoreExecutors.directExecutor());
}
private void handlePubcomp(MqttMessage message) {
@ -274,4 +314,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
pendingPublish.getPayload().release();
pendingPublish.onPubcompReceived();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
if (cause instanceof IOException) {
if (log.isDebugEnabled()) {
log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause);
} else {
log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage());
}
} else {
log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause);
}
} finally {
ReferenceCountUtil.release(cause);
}
}
}

View File

@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future;
import org.thingsboard.common.util.ListeningExecutor;
public interface MqttClient {
@ -71,6 +72,8 @@ public interface MqttClient {
*/
void setEventLoop(EventLoopGroup eventLoop);
ListeningExecutor getHandlerExecutor();
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
*
@ -180,8 +183,8 @@ public interface MqttClient {
* @param config The config object to use while looking for settings
* @param defaultHandler The handler for incoming messages that do not match any topic subscriptions
*/
static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){
return new MqttClientImpl(config, defaultHandler);
static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){
return new MqttClientImpl(config, defaultHandler, handlerExecutor);
}
/**

View File

@ -19,6 +19,8 @@ import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -26,10 +28,12 @@ import java.util.Random;
@SuppressWarnings({"WeakerAccess", "unused"})
public final class MqttClientConfig {
private final SslContext sslContext;
private final String randomClientId;
@Getter
@Setter
private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes
private String clientId;
private int timeoutSeconds = 60;
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;

View File

@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ListeningExecutor;
import java.util.Collections;
import java.util.HashSet;
@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient {
private int port;
private MqttClientCallback callback;
private final ListeningExecutor handlerExecutor;
/**
* Construct the MqttClientImpl with default config
*/
public MqttClientImpl(MqttHandler defaultHandler) {
this.clientConfig = new MqttClientConfig();
this.defaultHandler = defaultHandler;
public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
this(new MqttClientConfig(), defaultHandler, handlerExecutor);
}
/**
@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient {
*
* @param clientConfig The config object to use while looking for settings
*/
public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) {
public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
this.clientConfig = clientConfig;
this.defaultHandler = defaultHandler;
this.handlerExecutor = handlerExecutor;
}
/**
@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient {
this.eventLoop = eventLoop;
}
@Override
public ListeningExecutor getHandlerExecutor() {
return this.handlerExecutor;
}
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
*

View File

@ -15,9 +15,10 @@
*/
package org.thingsboard.mqtt;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
public interface MqttHandler {
void onMessage(String topic, ByteBuf payload);
ListenableFuture<Void> onMessage(String topic, ByteBuf payload);
}

View File

@ -25,7 +25,7 @@ final class MqttSubscription {
private final boolean once;
private boolean called;
private volatile boolean called;
MqttSubscription(String topic, MqttHandler handler, boolean once) {
if (topic == null) {

View File

@ -26,6 +26,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult;
@ -49,8 +50,18 @@ public class MqttIntegrationTest {
MqttClient mqttClient;
AbstractListeningExecutor handlerExecutor;
@Before
public void init() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
this.eventLoopGroup = new NioEventLoopGroup();
this.mqttServer = new MqttServer();
@ -68,6 +79,9 @@ public class MqttIntegrationTest {
if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
if (this.handlerExecutor != null) {
this.handlerExecutor.destroy();
}
}
@Test
@ -108,9 +122,10 @@ public class MqttIntegrationTest {
private MqttClient initClient() throws Exception {
MqttClientConfig config = new MqttClientConfig();
config.setOwnerId("MqttIntegrationTest");
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null);
MqttClient client = MqttClient.create(config, null, handlerExecutor);
client.setEventLoop(this.eventLoopGroup);
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());

View File

@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode {
}
}
String getOwnerId(TbContext ctx) {
return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]";
}
protected MqttClient initClient(TbContext ctx) throws Exception {
MqttClientConfig config = new MqttClientConfig(getSslContext());
config.setOwnerId(getOwnerId(ctx));
if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());
@ -114,7 +118,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
prepareMqttClientConfig(config);
MqttClient client = MqttClient.create(config, null);
MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
client.setEventLoop(ctx.getSharedEventLoop());
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
MqttConnectResult result;

View File

@ -39,7 +39,7 @@ public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper {
@Override
public void onFailure(Throwable t) {
callback.onFailure(new RuleEngineException(t.getMessage()));
callback.onFailure(new RuleEngineException(t.getMessage(), t));
}
}

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cache:
type: "${CACHE_TYPE:redis}"

View File

@ -68,6 +68,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cache:
type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cache:
type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cache:
type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
cache:
type: "${CACHE_TYPE:redis}"

View File

@ -40,6 +40,7 @@ import {
EventContentDialogData
} from '@home/components/event/event-content-dialog.component';
import { isEqual, sortObjectKeys } from '@core/utils';
import { historyInterval, MINUTE } from '@shared/models/time/time.models';
import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay';
import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core';
import { ComponentPortal } from '@angular/cdk/portal';
@ -89,6 +90,7 @@ export class EventTableConfig extends EntityTableConfig<Event, TimePageLink> {
this.loadDataOnInit = false;
this.tableTitle = '';
this.useTimePageLink = true;
this.defaultTimewindowInterval = historyInterval(MINUTE * 15);
this.detailsPanelEnabled = false;
this.selectionEnabled = false;
this.searchEnabled = false;
@ -176,7 +178,7 @@ export class EventTableConfig extends EntityTableConfig<Event, TimePageLink> {
updateColumns(updateTableColumns: boolean = false): void {
this.columns = [];
this.columns.push(
new DateEntityTableColumn<Event>('createdTime', 'event.event-time', this.datePipe, '120px'),
new DateEntityTableColumn<Event>('createdTime', 'event.event-time', this.datePipe, '120px', 'yyyy-MM-dd HH:mm:ss.SSS'),
new EntityTableColumn<Event>('server', 'event.server', '100px',
(entity) => entity.body.server, entity => ({}), false));
switch (this.eventType) {