diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 4bd082ee38..4b868dfcbf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor testOnFailure_NotRateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("rule engine no cause")), + Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))), + Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode())) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_NotRateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback, never()).onRateLimit(any()); + verify(callback, never()).onSuccess(); + verify(ctx, never()).onSuccess(any()); + } + + private static Stream testOnFailure_RateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))), + Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))), + Arguments.of( + new RuleEngineException("caused lvl 3", + new RuntimeException( + new Exception( + new TbRateLimitsException(EntityType.ASSET))))) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_RateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback).onRateLimit(any()); + verify(callback).onFailure(any()); + verify(callback, never()).onSuccess(); + verify(ctx).onSuccess(msgId); + verify(ctx).onSuccess(any()); + verify(ctx, never()).onFailure(any(), any(), any()); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java new file mode 100644 index 0000000000..1d1da75da3 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.exception; + +public abstract class AbstractRateLimitException extends RuntimeException { + + public AbstractRateLimitException() { + super(); + } + + public AbstractRateLimitException(String message) { + super(message); + } + + public AbstractRateLimitException(String message, Throwable cause) { + super(message, cause); + } + + public AbstractRateLimitException(Throwable cause) { + super(cause); + } + + protected AbstractRateLimitException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java index 2d24184a3f..aa9441c776 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.common.data.exception; -public class ApiUsageLimitsExceededException extends RuntimeException { +public class ApiUsageLimitsExceededException extends AbstractRateLimitException { public ApiUsageLimitsExceededException(String message) { super(message); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleEngineException.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleEngineException.java index 1724a9050f..b8c713cc6b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleEngineException.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleEngineException.java @@ -32,6 +32,11 @@ public class RuleEngineException extends Exception { ts = System.currentTimeMillis(); } + public RuleEngineException(String message, Throwable t) { + super(message != null ? message : "Unknown", t); + ts = System.currentTimeMillis(); + } + public String toJsonString() { try { return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage())); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 3312c98b64..6cf298adb2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -39,6 +39,10 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); + default void onRateLimit(RuleEngineException e) { + onFailure(e); + }; + /** * Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise. * message may no longer be valid, if the message pack is already expired/canceled/failed. diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java index dd5fda4dd5..5e63cb037e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java @@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools; import lombok.Getter; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.exception.AbstractRateLimitException; /** * Created by ashvayka on 22.10.18. */ -public class TbRateLimitsException extends RuntimeException { +public class TbRateLimitsException extends AbstractRateLimitException { @Getter private final EntityType entityType; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueCallbackWrapper.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueCallbackWrapper.java index f14c9a047b..f102e34666 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueCallbackWrapper.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueCallbackWrapper.java @@ -40,6 +40,6 @@ public class MultipleTbQueueCallbackWrapper implements TbQueueCallback { @Override public void onFailure(Throwable t) { - callback.onFailure(new RuleEngineException(t.getMessage())); + callback.onFailure(new RuleEngineException(t.getMessage(), t)); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueTbMsgCallbackWrapper.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueTbMsgCallbackWrapper.java index e812628f16..b3bd05ca5e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueTbMsgCallbackWrapper.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/MultipleTbQueueTbMsgCallbackWrapper.java @@ -41,6 +41,6 @@ public class MultipleTbQueueTbMsgCallbackWrapper implements TbQueueCallback { @Override public void onFailure(Throwable t) { - tbMsgCallback.onFailure(new RuleEngineException(t.getMessage())); + tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t)); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbQueueTbMsgCallbackWrapper.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbQueueTbMsgCallbackWrapper.java index 413ef11e30..1107115085 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbQueueTbMsgCallbackWrapper.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbQueueTbMsgCallbackWrapper.java @@ -35,6 +35,6 @@ public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback { @Override public void onFailure(Throwable t) { - tbMsgCallback.onFailure(new RuleEngineException(t.getMessage())); + tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t)); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index fcf80bcf3d..e99817de17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.discovery; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ProtocolStringList; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -44,8 +45,10 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -66,6 +69,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; + @Value("${zk.recalculate_delay:0}") + private Long recalculateDelay; + + protected final ConcurrentHashMap> delayedTasks; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; @@ -82,6 +89,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi PartitionService partitionService) { this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; + delayedTasks = new ConcurrentHashMap<>(); } @PostConstruct @@ -287,11 +295,39 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.error("Failed to decode server instance for node {}", data.getPath(), e); throw e; } - log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); + + String serviceId = instance.getServiceId(); + ProtocolStringList serviceTypesList = instance.getServiceTypesList(); + + log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: + ScheduledFuture task = delayedTasks.remove(serviceId); + if (task != null) { + if (task.cancel(false)) { + log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", + serviceId, serviceTypesList); + } else { + log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", + serviceId, serviceTypesList); + recalculatePartitions(); + } + } else { + log.trace("[{}] Going to recalculate partitions due to adding new node [{}].", + serviceId, serviceTypesList); + recalculatePartitions(); + } + break; case CHILD_REMOVED: - recalculatePartitions(); + ScheduledFuture future = zkExecutorService.schedule(() -> { + log.debug("[{}] Going to recalculate partitions due to removed node [{}]", + serviceId, serviceTypesList); + ScheduledFuture removedTask = delayedTasks.remove(serviceId); + if (removedTask != null) { + recalculatePartitions(); + } + }, recalculateDelay, TimeUnit.MILLISECONDS); + delayedTasks.put(serviceId, future); break; default: break; @@ -303,6 +339,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi * Synchronized to ensure that other servers info is up to date * */ synchronized void recalculatePartitions() { + delayedTasks.values().forEach(future -> future.cancel(false)); + delayedTasks.clear(); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 15c2f04d17..850f5c08ea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate implements TbQueuePro private final Set topics; + @Getter + private final String clientId; + @Builder private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) { Properties props = settings.toProducerProps(); + this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null"); if (!StringUtils.isEmpty(clientId)) { props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); } @@ -72,6 +79,22 @@ public class TbKafkaProducerTemplate implements TbQueuePro public void init() { } + void addAnalyticHeaders(List
headers) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + if (log.isTraceEnabled()) { + try { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int maxlevel = Math.min(stackTrace.length, 10); + for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders + headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); + } + } catch (Throwable t) { + log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t); + } + } + } + @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { try { @@ -79,7 +102,10 @@ public class TbKafkaProducerTemplate implements TbQueuePro String key = msg.getKey().toString(); byte[] data = msg.getData(); ProducerRecord record; - Iterable
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + if (log.isDebugEnabled()) { + addAnalyticHeaders(headers); + } record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java new file mode 100644 index 0000000000..a8810efd0e --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -0,0 +1,189 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED; +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ZkDiscoveryServiceTest { + + @Mock + private TbServiceInfoProvider serviceInfoProvider; + + @Mock + private PartitionService partitionService; + + @Mock + private CuratorFramework client; + + @Mock + private PathChildrenCache cache; + + @Mock + private CuratorFramework curatorFramework; + + private ZkDiscoveryService zkDiscoveryService; + + private static final long RECALCULATE_DELAY = 100L; + + final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build(); + final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); + final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build(); + final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); + + @Before + public void setup() { + zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); + ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); + when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); + ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); + ReflectionTestUtils.setField(zkDiscoveryService, "client", client); + ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); + ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); + ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); + ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY); + ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); + + when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); + + List dataList = new ArrayList<>(); + dataList.add(currentData); + when(cache.getCurrentData()).thenReturn(dataList); + } + + @Test + public void restartNodeInTimeTest() throws Exception { + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + verify(partitionService, never()).recalculatePartitions(any(), any()); + + startNode(childData); + + verify(partitionService, never()).recalculatePartitions(any(), any()); + + Thread.sleep(RECALCULATE_DELAY * 2); + + verify(partitionService, never()).recalculatePartitions(any(), any()); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + } + + @Test + public void restartNodeNotInTimeTest() throws Exception { + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + Thread.sleep(RECALCULATE_DELAY * 2); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(Collections.emptyList())); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + } + + @Test + public void startAnotherNodeDuringRestartTest() throws Exception { + var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build(); + var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + startNode(anotherData); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); + reset(partitionService); + + Thread.sleep(RECALCULATE_DELAY * 2); + + verify(partitionService, never()).recalculatePartitions(any(), any()); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo))); + } + + private void startNode(ChildData data) throws Exception { + cache.getCurrentData().add(data); + zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data)); + } + + private void stopNode(ChildData data) throws Exception { + cache.getCurrentData().remove(data); + zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data)); + } + +} diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java new file mode 100644 index 0000000000..bfd3c4a6dc --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +@Slf4j +class TbKafkaProducerTemplateTest { + + TbKafkaProducerTemplate producerTemplate; + + @BeforeEach + void setUp() { + producerTemplate = mock(TbKafkaProducerTemplate.class); + willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any()); + willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId(); + } + + @Test + void testAddAnalyticHeaders() { + List
headers = new ArrayList<>(); + producerTemplate.addAnalyticHeaders(headers); + assertThat(headers).isNotEmpty(); + headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8))); + } + +} diff --git a/common/queue/src/test/resources/logback-test.xml b/common/queue/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..f7053313d4 --- /dev/null +++ b/common/queue/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java index 302bd20c8b..eef9a53024 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java @@ -52,7 +52,7 @@ import java.util.stream.Collectors; public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { @Autowired - private TbLwM2MModelConfigStore modelStore; + TbLwM2MModelConfigStore modelStore; @Autowired @Lazy @@ -67,14 +67,14 @@ public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { @Autowired private LwM2MTelemetryLogService logService; - private ConcurrentMap currentModelConfigs; + ConcurrentMap currentModelConfigs; @AfterStartUp(order = AfterStartUp.BEFORE_TRANSPORT_SERVICE) - private void init() { + public void init() { List models = modelStore.getAll(); log.debug("Fetched model configs: {}", models); currentModelConfigs = models.stream() - .collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m)); + .collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m, (existing, replacement) -> existing)); } @Override diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java new file mode 100644 index 0000000000..fc54ca9e0b --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.model; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +class LwM2MModelConfigServiceImplTest { + + LwM2MModelConfigServiceImpl service; + TbLwM2MModelConfigStore modelStore; + + @BeforeEach + void setUp() { + service = new LwM2MModelConfigServiceImpl(); + modelStore = mock(TbLwM2MModelConfigStore.class); + service.modelStore = modelStore; + } + + @Test + void testInitWithDuplicatedModels() { + LwM2MModelConfig config = new LwM2MModelConfig("urn:imei:951358811362976"); + List models = List.of(config, config); + willReturn(models).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).containsExactlyEntriesOf(Map.of(config.getEndpoint(), config)); + } + + @Test + void testInitWithNonUniqueEndpoints() { + LwM2MModelConfig configAlfa = new LwM2MModelConfig("urn:imei:951358811362976"); + LwM2MModelConfig configBravo = new LwM2MModelConfig("urn:imei:151358811362976"); + LwM2MModelConfig configDelta = new LwM2MModelConfig("urn:imei:151358811362976"); + assertThat(configBravo.getEndpoint()).as("non-unique endpoints provided").isEqualTo(configDelta.getEndpoint()); + List models = List.of(configAlfa, configBravo, configDelta); + willReturn(models).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).containsExactlyInAnyOrderEntriesOf(Map.of( + configAlfa.getEndpoint(), configAlfa, + configBravo.getEndpoint(), configBravo + )); + } + + @Test + void testInitWithEmptyModels() { + willReturn(Collections.emptyList()).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).isEmpty(); + } + +} diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java new file mode 100644 index 0000000000..2ec15f3724 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.google.gson.JsonParseException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.EntityId; + +import javax.script.ScriptException; +import java.io.PrintWriter; +import java.io.StringWriter; + +@Slf4j +public class ExceptionUtil { + + @SuppressWarnings("unchecked") + public static T lookupException(Throwable source, Class clazz) { + Exception e = lookupExceptionInCause(source, clazz); + if (e != null) { + return (T) e; + } else { + return null; + } + } + + public static Exception lookupExceptionInCause(Throwable source, Class... clazzes) { + while (source != null) { + for (Class clazz : clazzes) { + if (clazz.isAssignableFrom(source.getClass())) { + return (Exception) source; + } + } + source = source.getCause(); + } + return null; + } + + public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) { + Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class); + if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) { + return exception.getMessage(); + } else { + if (stackTraceEnabled) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } else { + log.debug("[{}] Unknown error during message processing", componentId, e); + return "Please contact system administrator"; + } + } + } +} diff --git a/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java new file mode 100644 index 0000000000..e589ae8e30 --- /dev/null +++ b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExceptionUtilTest { + + final Exception cause = new RuntimeException(); + + @Test + void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() { + Exception e = cause; + for (int i = 0; i <= 16384; i++) { + e = new Exception(e); + } + assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenCause_whenLookupExceptionInCause_thenReturnCause() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() { + assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCause_whenLookupExceptionInCause_thenReturnNull() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull(); + } + + @Test + void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() { + final Exception cause = new IOException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull(); + } + + @Test + void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() { + final Exception causeIAE = new IllegalAccessException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE); + + final Exception causeIOE = new IOException(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java index 425724bf2c..92289b7178 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java @@ -19,13 +19,13 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.ThingsBoardExecutors; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Created by ashvayka on 21.02.17. @@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { protected ExecutorService readResultsProcessingExecutor; + @Value("${cassandra.query.result_processing_threads:50}") + private int threadPoolSize; + @PostConstruct public void startExecutor() { - readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback")); + readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback"); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 9fbf796eb0..daade9acf3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -124,7 +124,7 @@ public abstract class AbstractBufferedRateExecutor new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration()) @@ -173,7 +173,7 @@ public abstract class AbstractBufferedRateExecutor taskCtx = null; @@ -185,7 +185,7 @@ public abstract class AbstractBufferedRateExecutor= printQueriesFreq) { printQueriesIdx.set(0); String query = queryToString(finalTaskCtx); - log.info("[{}] Cassandra query: {}", taskCtx.getId(), query); + log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query); } } logTask("Processing", finalTaskCtx); @@ -238,7 +238,7 @@ public abstract class AbstractBufferedRateExecutor taskCtx) { @@ -314,7 +314,7 @@ public abstract class AbstractBufferedRateExecutor onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index a038d4cf50..5e2a9f5fe6 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -32,6 +33,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; @@ -76,8 +78,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest { private MqttMessageListener listener; private JsonParser jsonParser = new JsonParser(); + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void createGateway() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype()); DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); @@ -94,6 +106,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest { this.listener = null; this.mqttClient = null; this.createdDevice = null; + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test @@ -403,11 +418,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest { return testRestClient.getDeviceById(createdDeviceId); } + private String getOwnerId() { + return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]"; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } @@ -421,9 +441,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 9aa149280a..61ce004c04 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml index 60883f4b34..400b486e18 100644 --- a/netty-mqtt/pom.xml +++ b/netty-mqtt/pom.xml @@ -35,6 +35,10 @@ + + org.thingsboard.common + util + io.netty netty-codec-mqtt diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index e243f66633..a40c30ef96 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -16,6 +16,10 @@ package org.thingsboard.mqtt; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j final class MqttChannelHandler extends SimpleChannelInboundHandler { private final MqttClientImpl client; @@ -110,27 +121,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler super.channelInactive(ctx); } - private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { - boolean handlerInvoked = false; - for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { - if (subscription.matches(message.variableHeader().topicName())) { - if (subscription.isOnce() && subscription.isCalled()) { - continue; + ListenableFuture invokeHandlersForIncomingPublish(MqttPublishMessage message) { + var future = Futures.immediateVoidFuture(); + var handlerInvoked = new AtomicBoolean(); + try { + for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { + if (subscription.matches(message.variableHeader().topicName())) { + future = Futures.transform(future, x -> { + if (subscription.isOnce() && subscription.isCalled()) { + return null; + } + message.payload().markReaderIndex(); + subscription.setCalled(true); + subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if (subscription.isOnce()) { + this.client.off(subscription.getTopic(), subscription.getHandler()); + } + message.payload().resetReaderIndex(); + handlerInvoked.set(true); + return null; + }, client.getHandlerExecutor()); } - message.payload().markReaderIndex(); - subscription.setCalled(true); - subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); - if (subscription.isOnce()) { - this.client.off(subscription.getTopic(), subscription.getHandler()); - } - message.payload().resetReaderIndex(); - handlerInvoked = true; } + future = Futures.transform(future, x -> { + if (!handlerInvoked.get() && client.getDefaultHandler() != null) { + client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); + } + return null; + }, client.getHandlerExecutor()); + } finally { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void result) { + message.payload().release(); + } + + @Override + public void onFailure(Throwable t) { + message.payload().release(); + } + }, MoreExecutors.directExecutor()); } - if (!handlerInvoked && client.getDefaultHandler() != null) { - client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); - } - message.payload().release(); + return future; } private void handleConack(Channel channel, MqttConnAckMessage message) { @@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler break; case AT_LEAST_ONCE: - invokeHandlersForIncomingPublish(message); + var future = invokeHandlersForIncomingPublish(message); if (message.variableHeader().packetId() != -1) { - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); - channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } break; @@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler } private void handlePubrel(Channel channel, MqttMessage message) { + var future = Futures.immediateVoidFuture(); if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); + future = Futures.transform(future, x -> { + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + return null; + }, MoreExecutors.directExecutor()); } - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } private void handlePubcomp(MqttMessage message) { @@ -274,4 +314,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.getPayload().release(); pendingPublish.onPubcompReceived(); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + if (cause instanceof IOException) { + if (log.isDebugEnabled()) { + log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause); + } else { + log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage()); + } + } else { + log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause); + } + } finally { + ReferenceCountUtil.release(cause); + } + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java index 2fe179de31..536a76119f 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import org.thingsboard.common.util.ListeningExecutor; public interface MqttClient { @@ -71,6 +72,8 @@ public interface MqttClient { */ void setEventLoop(EventLoopGroup eventLoop); + ListeningExecutor getHandlerExecutor(); + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * @@ -180,8 +183,8 @@ public interface MqttClient { * @param config The config object to use while looking for settings * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions */ - static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){ - return new MqttClientImpl(config, defaultHandler); + static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){ + return new MqttClientImpl(config, defaultHandler, handlerExecutor); } /** diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 1e20a842c3..10cee5d5fc 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; +import lombok.Getter; +import lombok.Setter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,10 +28,12 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { - private final SslContext sslContext; private final String randomClientId; + @Getter + @Setter + private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes private String clientId; private int timeoutSeconds = 60; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index f38a790be7..63d65a1cc2 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ListeningExecutor; import java.util.Collections; import java.util.HashSet; @@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient { private int port; private MqttClientCallback callback; + private final ListeningExecutor handlerExecutor; /** * Construct the MqttClientImpl with default config */ - public MqttClientImpl(MqttHandler defaultHandler) { - this.clientConfig = new MqttClientConfig(); - this.defaultHandler = defaultHandler; + public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { + this(new MqttClientConfig(), defaultHandler, handlerExecutor); } /** @@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient { * * @param clientConfig The config object to use while looking for settings */ - public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) { + public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { this.clientConfig = clientConfig; this.defaultHandler = defaultHandler; + this.handlerExecutor = handlerExecutor; } /** @@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient { this.eventLoop = eventLoop; } + @Override + public ListeningExecutor getHandlerExecutor() { + return this.handlerExecutor; + } + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java index 0ec03ff04b..21c07a17cd 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java @@ -15,9 +15,10 @@ */ package org.thingsboard.mqtt; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; public interface MqttHandler { - void onMessage(String topic, ByteBuf payload); + ListenableFuture onMessage(String topic, ByteBuf payload); } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 6c4abb4c5c..c4bc9e38c1 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -25,7 +25,7 @@ final class MqttSubscription { private final boolean once; - private boolean called; + private volatile boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { if (topic == null) { diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index cb1b6b81fe..04c2be1740 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -26,6 +26,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; @@ -49,8 +50,18 @@ public class MqttIntegrationTest { MqttClient mqttClient; + AbstractListeningExecutor handlerExecutor; + @Before public void init() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + this.eventLoopGroup = new NioEventLoopGroup(); this.mqttServer = new MqttServer(); @@ -68,6 +79,9 @@ public class MqttIntegrationTest { if (this.eventLoopGroup != null) { this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } + if (this.handlerExecutor != null) { + this.handlerExecutor.destroy(); + } } @Test @@ -108,9 +122,10 @@ public class MqttIntegrationTest { private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); + config.setOwnerId("MqttIntegrationTest"); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, handlerExecutor); client.setEventLoop(this.eventLoopGroup); Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 121b9fb756..1376f29710 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode { } } + String getOwnerId(TbContext ctx) { + return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; + } + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); + config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId()); @@ -114,7 +118,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java index ee63201fde..142aa8c085 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java @@ -39,7 +39,7 @@ public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper { @Override public void onFailure(Throwable t) { - callback.onFailure(new RuleEngineException(t.getMessage())); + callback.onFailure(new RuleEngineException(t.getMessage(), t)); } } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 5e0a7f2a9d..47243ec199 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index f7af5c979a..c820b2a2a2 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,6 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 3d649b3c2b..b725fd7ee4 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 38f569cfd7..4c4149fa4f 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index c4d76dbf30..137e2e4439 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index a9816a130c..eb3edb0baf 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -40,6 +40,7 @@ import { EventContentDialogData } from '@home/components/event/event-content-dialog.component'; import { isEqual, sortObjectKeys } from '@core/utils'; +import { historyInterval, MINUTE } from '@shared/models/time/time.models'; import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay'; import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core'; import { ComponentPortal } from '@angular/cdk/portal'; @@ -89,6 +90,7 @@ export class EventTableConfig extends EntityTableConfig { this.loadDataOnInit = false; this.tableTitle = ''; this.useTimePageLink = true; + this.defaultTimewindowInterval = historyInterval(MINUTE * 15); this.detailsPanelEnabled = false; this.selectionEnabled = false; this.searchEnabled = false; @@ -176,7 +178,7 @@ export class EventTableConfig extends EntityTableConfig { updateColumns(updateTableColumns: boolean = false): void { this.columns = []; this.columns.push( - new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px'), + new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px', 'yyyy-MM-dd HH:mm:ss.SSS'), new EntityTableColumn('server', 'event.server', '100px', (entity) => entity.body.server, entity => ({}), false)); switch (this.eventType) {