From 3b8a9d94ecfffeb3813bcc75d203c568be4ab567 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 24 Jul 2023 22:57:52 +0200 Subject: [PATCH 01/23] Lwm2m transport - merge non-unique endpoints for models fetched from cache --- .../model/LwM2MModelConfigServiceImpl.java | 8 +- .../LwM2MModelConfigServiceImplTest.java | 73 +++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java index 302bd20c8b..eef9a53024 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java @@ -52,7 +52,7 @@ import java.util.stream.Collectors; public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { @Autowired - private TbLwM2MModelConfigStore modelStore; + TbLwM2MModelConfigStore modelStore; @Autowired @Lazy @@ -67,14 +67,14 @@ public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { @Autowired private LwM2MTelemetryLogService logService; - private ConcurrentMap currentModelConfigs; + ConcurrentMap currentModelConfigs; @AfterStartUp(order = AfterStartUp.BEFORE_TRANSPORT_SERVICE) - private void init() { + public void init() { List models = modelStore.getAll(); log.debug("Fetched model configs: {}", models); currentModelConfigs = models.stream() - .collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m)); + .collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m, (existing, replacement) -> existing)); } @Override diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java new file mode 100644 index 0000000000..fc54ca9e0b --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImplTest.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.model; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +class LwM2MModelConfigServiceImplTest { + + LwM2MModelConfigServiceImpl service; + TbLwM2MModelConfigStore modelStore; + + @BeforeEach + void setUp() { + service = new LwM2MModelConfigServiceImpl(); + modelStore = mock(TbLwM2MModelConfigStore.class); + service.modelStore = modelStore; + } + + @Test + void testInitWithDuplicatedModels() { + LwM2MModelConfig config = new LwM2MModelConfig("urn:imei:951358811362976"); + List models = List.of(config, config); + willReturn(models).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).containsExactlyEntriesOf(Map.of(config.getEndpoint(), config)); + } + + @Test + void testInitWithNonUniqueEndpoints() { + LwM2MModelConfig configAlfa = new LwM2MModelConfig("urn:imei:951358811362976"); + LwM2MModelConfig configBravo = new LwM2MModelConfig("urn:imei:151358811362976"); + LwM2MModelConfig configDelta = new LwM2MModelConfig("urn:imei:151358811362976"); + assertThat(configBravo.getEndpoint()).as("non-unique endpoints provided").isEqualTo(configDelta.getEndpoint()); + List models = List.of(configAlfa, configBravo, configDelta); + willReturn(models).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).containsExactlyInAnyOrderEntriesOf(Map.of( + configAlfa.getEndpoint(), configAlfa, + configBravo.getEndpoint(), configBravo + )); + } + + @Test + void testInitWithEmptyModels() { + willReturn(Collections.emptyList()).given(modelStore).getAll(); + service.init(); + assertThat(service.currentModelConfigs).isEmpty(); + } + +} From 7e27c5b6833a725c4d0c4e524b3c483d95001451 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 13 Jun 2023 18:17:25 +0200 Subject: [PATCH 02/23] mqtt-client: messages processing moved from netty event loop pool and to the handlerExecutor to make netty handlers non-blocking --- .../msa/connectivity/MqttClientTest.java | 16 ++- .../connectivity/MqttGatewayClientTest.java | 16 ++- netty-mqtt/pom.xml | 4 + .../thingsboard/mqtt/MqttChannelHandler.java | 113 +++++++++++++----- .../java/org/thingsboard/mqtt/MqttClient.java | 7 +- .../org/thingsboard/mqtt/MqttClientImpl.java | 15 ++- .../thingsboard/mqtt/MqttSubscription.java | 2 +- .../mqtt/integration/MqttIntegrationTest.java | 16 ++- .../rule/engine/mqtt/TbMqttNode.java | 2 +- 9 files changed, 153 insertions(+), 38 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 893c8b565c..96e57549a8 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; @@ -74,8 +75,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi public class MqttClientTest extends AbstractContainerTest { private Device device; + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void setUp() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); device = testRestClient.postDevice("", defaultDevicePrototype("http_")); } @@ -83,6 +94,9 @@ public class MqttClientTest extends AbstractContainerTest { @AfterMethod public void tearDown() { testRestClient.deleteDeviceIfExists(device.getId()); + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test public void telemetryUpload() throws Exception { @@ -465,7 +479,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index a038d4cf50..8cddb69fa3 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -32,6 +32,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; @@ -76,8 +77,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest { private MqttMessageListener listener; private JsonParser jsonParser = new JsonParser(); + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void createGateway() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype()); DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); @@ -94,6 +105,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest { this.listener = null; this.mqttClient = null; this.createdDevice = null; + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test @@ -407,7 +421,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml index 60883f4b34..400b486e18 100644 --- a/netty-mqtt/pom.xml +++ b/netty-mqtt/pom.xml @@ -35,6 +35,10 @@ + + org.thingsboard.common + util + io.netty netty-codec-mqtt diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index e243f66633..6b3a4e009e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -16,6 +16,10 @@ package org.thingsboard.mqtt; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j final class MqttChannelHandler extends SimpleChannelInboundHandler { private final MqttClientImpl client; @@ -110,27 +121,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler super.channelInactive(ctx); } - private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { - boolean handlerInvoked = false; - for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { - if (subscription.matches(message.variableHeader().topicName())) { - if (subscription.isOnce() && subscription.isCalled()) { - continue; + ListenableFuture invokeHandlersForIncomingPublish(MqttPublishMessage message) { + var future = Futures.immediateVoidFuture(); + var handlerInvoked = new AtomicBoolean(); + try { + for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { + if (subscription.matches(message.variableHeader().topicName())) { + future = Futures.transform(future, x -> { + if (subscription.isOnce() && subscription.isCalled()) { + return null; + } + message.payload().markReaderIndex(); + subscription.setCalled(true); + subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if (subscription.isOnce()) { + this.client.off(subscription.getTopic(), subscription.getHandler()); + } + message.payload().resetReaderIndex(); + handlerInvoked.set(true); + return null; + }, client.getHandlerExecutor()); } - message.payload().markReaderIndex(); - subscription.setCalled(true); - subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); - if (subscription.isOnce()) { - this.client.off(subscription.getTopic(), subscription.getHandler()); - } - message.payload().resetReaderIndex(); - handlerInvoked = true; } + future = Futures.transform(future, x -> { + if (!handlerInvoked.get() && client.getDefaultHandler() != null) { + client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); + } + return null; + }, client.getHandlerExecutor()); + } finally { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void result) { + message.payload().release(); + } + + @Override + public void onFailure(Throwable t) { + message.payload().release(); + } + }, MoreExecutors.directExecutor()); } - if (!handlerInvoked && client.getDefaultHandler() != null) { - client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); - } - message.payload().release(); + return future; } private void handleConack(Channel channel, MqttConnAckMessage message) { @@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler break; case AT_LEAST_ONCE: - invokeHandlersForIncomingPublish(message); + var future = invokeHandlersForIncomingPublish(message); if (message.variableHeader().packetId() != -1) { - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); - channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } break; @@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler } private void handlePubrel(Channel channel, MqttMessage message) { + var future = Futures.immediateVoidFuture(); if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); + future = Futures.transform(future, x -> { + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + return null; + }, MoreExecutors.directExecutor()); } - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } private void handlePubcomp(MqttMessage message) { @@ -274,4 +314,23 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.getPayload().release(); pendingPublish.onPubcompReceived(); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + if (cause instanceof IOException) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause); + } else if (log.isInfoEnabled()) { + log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause.getMessage()); + } + } else { + log.warn("exceptionCaught", cause); + } + } finally { + ReferenceCountUtil.release(cause); + } + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java index 2fe179de31..536a76119f 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import org.thingsboard.common.util.ListeningExecutor; public interface MqttClient { @@ -71,6 +72,8 @@ public interface MqttClient { */ void setEventLoop(EventLoopGroup eventLoop); + ListeningExecutor getHandlerExecutor(); + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * @@ -180,8 +183,8 @@ public interface MqttClient { * @param config The config object to use while looking for settings * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions */ - static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){ - return new MqttClientImpl(config, defaultHandler); + static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){ + return new MqttClientImpl(config, defaultHandler, handlerExecutor); } /** diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index f38a790be7..63d65a1cc2 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ListeningExecutor; import java.util.Collections; import java.util.HashSet; @@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient { private int port; private MqttClientCallback callback; + private final ListeningExecutor handlerExecutor; /** * Construct the MqttClientImpl with default config */ - public MqttClientImpl(MqttHandler defaultHandler) { - this.clientConfig = new MqttClientConfig(); - this.defaultHandler = defaultHandler; + public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { + this(new MqttClientConfig(), defaultHandler, handlerExecutor); } /** @@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient { * * @param clientConfig The config object to use while looking for settings */ - public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) { + public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { this.clientConfig = clientConfig; this.defaultHandler = defaultHandler; + this.handlerExecutor = handlerExecutor; } /** @@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient { this.eventLoop = eventLoop; } + @Override + public ListeningExecutor getHandlerExecutor() { + return this.handlerExecutor; + } + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 6c4abb4c5c..c4bc9e38c1 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -25,7 +25,7 @@ final class MqttSubscription { private final boolean once; - private boolean called; + private volatile boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { if (topic == null) { diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index cb1b6b81fe..f39ca01110 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -26,6 +26,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; @@ -49,8 +50,18 @@ public class MqttIntegrationTest { MqttClient mqttClient; + AbstractListeningExecutor handlerExecutor; + @Before public void init() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + this.eventLoopGroup = new NioEventLoopGroup(); this.mqttServer = new MqttServer(); @@ -68,6 +79,9 @@ public class MqttIntegrationTest { if (this.eventLoopGroup != null) { this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } + if (this.handlerExecutor != null) { + this.handlerExecutor.destroy(); + } } @Test @@ -110,7 +124,7 @@ public class MqttIntegrationTest { MqttClientConfig config = new MqttClientConfig(); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, handlerExecutor); client.setEventLoop(this.eventLoopGroup); Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 121b9fb756..49b31cb33c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -114,7 +114,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; From d74e0c45df8442e709928c3c04efe36442782a07 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 20 Jun 2023 14:26:30 +0200 Subject: [PATCH 03/23] MqttHandler - processAsync (required for AbstractMqttIntegration) --- .../thingsboard/server/msa/connectivity/MqttClientTest.java | 4 +++- .../server/msa/connectivity/MqttGatewayClientTest.java | 4 +++- .../src/main/java/org/thingsboard/mqtt/MqttHandler.java | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 96e57549a8..940bd6777e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -493,9 +494,10 @@ public class MqttClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 8cddb69fa3..de11df2623 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -435,9 +436,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java index 0ec03ff04b..21c07a17cd 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java @@ -15,9 +15,10 @@ */ package org.thingsboard.mqtt; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; public interface MqttHandler { - void onMessage(String topic, ByteBuf payload); + ListenableFuture onMessage(String topic, ByteBuf payload); } From c3e9ab59918f04c5eb9d79df47948c187d09e7cf Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 26 Jul 2023 20:38:22 +0200 Subject: [PATCH 04/23] TbKafkaProducerTemplate will add headers for each message when log level: DEBUG - producerId and thread name; TRACE - stacktrace first 10-2=8 lines --- .../queue/kafka/TbKafkaProducerTemplate.java | 28 +++++++++- .../kafka/TbKafkaProducerTemplateTest.java | 54 +++++++++++++++++++ .../queue/src/test/resources/logback-test.xml | 20 +++++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java create mode 100644 common/queue/src/test/resources/logback-test.xml diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 15c2f04d17..7c1c28b9f5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate implements TbQueuePro private final Set topics; + @Getter + private final String clientId; + @Builder private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) { Properties props = settings.toProducerProps(); + this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null"); if (!StringUtils.isEmpty(clientId)) { props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); } @@ -72,6 +79,24 @@ public class TbKafkaProducerTemplate implements TbQueuePro public void init() { } + void addAnalyticHeaders(List
headers) { + try { + if (log.isDebugEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + } + if (log.isTraceEnabled()) { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int maxlevel = Math.min(stackTrace.length, 10); + for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders + headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); + } + } + } catch (Throwable t) { + log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); + } + } + @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { try { @@ -79,7 +104,8 @@ public class TbKafkaProducerTemplate implements TbQueuePro String key = msg.getKey().toString(); byte[] data = msg.getData(); ProducerRecord record; - Iterable
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + addAnalyticHeaders(headers); record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java new file mode 100644 index 0000000000..bfd3c4a6dc --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +@Slf4j +class TbKafkaProducerTemplateTest { + + TbKafkaProducerTemplate producerTemplate; + + @BeforeEach + void setUp() { + producerTemplate = mock(TbKafkaProducerTemplate.class); + willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any()); + willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId(); + } + + @Test + void testAddAnalyticHeaders() { + List
headers = new ArrayList<>(); + producerTemplate.addAnalyticHeaders(headers); + assertThat(headers).isNotEmpty(); + headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8))); + } + +} diff --git a/common/queue/src/test/resources/logback-test.xml b/common/queue/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..f7053313d4 --- /dev/null +++ b/common/queue/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + From 2cccd0951a491c8a63264a6982d2e3744e78aaeb Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 19:40:27 +0200 Subject: [PATCH 05/23] TbKafkaProducerTemplate: addAnalyticHeaders optimized --- .../queue/kafka/TbKafkaProducerTemplate.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 7c1c28b9f5..850f5c08ea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -80,20 +80,18 @@ public class TbKafkaProducerTemplate implements TbQueuePro } void addAnalyticHeaders(List
headers) { - try { - if (log.isDebugEnabled()) { - headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); - headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); - } - if (log.isTraceEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + if (log.isTraceEnabled()) { + try { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); int maxlevel = Math.min(stackTrace.length, 10); for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); } + } catch (Throwable t) { + log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t); } - } catch (Throwable t) { - log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); } } @@ -105,7 +103,9 @@ public class TbKafkaProducerTemplate implements TbQueuePro byte[] data = msg.getData(); ProducerRecord record; List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); - addAnalyticHeaders(headers); + if (log.isDebugEnabled()) { + addAnalyticHeaders(headers); + } record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { From ed6614af71a3c0d4c08c9772ed84f08e2e2c0ea3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 20:59:58 +0200 Subject: [PATCH 06/23] MqttClientConfig - ownerId added for exceptions logging purposes. MqttChannelHandler - improved logging --- .../server/msa/connectivity/MqttClientTest.java | 5 +++++ .../server/msa/connectivity/MqttGatewayClientTest.java | 5 +++++ .../java/org/thingsboard/mqtt/MqttChannelHandler.java | 10 ++++------ .../java/org/thingsboard/mqtt/MqttClientConfig.java | 6 +++++- .../mqtt/integration/MqttIntegrationTest.java | 1 + .../org/thingsboard/rule/engine/mqtt/TbMqttNode.java | 6 +++++- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 940bd6777e..5a7e9b631d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -476,8 +476,13 @@ public class MqttClientTest extends AbstractContainerTest { return getMqttClient(deviceCredentials.getCredentialsId(), listener); } + private String getOwnerId() { + return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]"; + } + private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index de11df2623..5e2a9f5fe6 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -418,8 +418,13 @@ public class MqttGatewayClientTest extends AbstractContainerTest { return testRestClient.getDeviceById(createdDeviceId); } + private String getOwnerId() { + return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]"; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index 6b3a4e009e..a40c30ef96 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -320,14 +320,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler try { if (cause instanceof IOException) { if (log.isDebugEnabled()) { - log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause); - } else if (log.isInfoEnabled()) { - log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause.getMessage()); + log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause); + } else { + log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage()); } } else { - log.warn("exceptionCaught", cause); + log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause); } } finally { ReferenceCountUtil.release(cause); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 1e20a842c3..10cee5d5fc 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; +import lombok.Getter; +import lombok.Setter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,10 +28,12 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { - private final SslContext sslContext; private final String randomClientId; + @Getter + @Setter + private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes private String clientId; private int timeoutSeconds = 60; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index f39ca01110..04c2be1740 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -122,6 +122,7 @@ public class MqttIntegrationTest { private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); + config.setOwnerId("MqttIntegrationTest"); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null, handlerExecutor); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 49b31cb33c..1376f29710 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode { } } + String getOwnerId(TbContext ctx) { + return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; + } + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); + config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId()); From 1b565e01ce5caa1c652758a8be82943806e04194 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 25 Jul 2023 06:54:48 +0200 Subject: [PATCH 07/23] Rule engine: ack all rate limited failures (draft) --- .../service/queue/TbMsgPackCallback.java | 18 ++++++++++++++++++ .../server/common/msg/queue/TbMsgCallback.java | 4 ++++ 2 files changed, 22 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index ef2ba8798d..66364406c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -17,11 +17,13 @@ package org.thingsboard.server.service.queue; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -57,8 +59,24 @@ public class TbMsgPackCallback implements TbMsgCallback { ctx.onSuccess(id); } + @Override + public void onRateLimit(RuleEngineException e) { + log.debug("[{}] ON RATE LIMIT", id, e); + //TODO notify tenant on rate limit + if (failedMsgTimer != null) { + failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); + } + ctx.onSuccess(id); + } + @Override public void onFailure(RuleEngineException e) { + Throwable cause = e.getCause(); + if (cause instanceof TbRateLimitsException || cause instanceof ApiUsageLimitsExceededException) { + onRateLimit(e); + return; + } + log.trace("[{}] ON FAILURE", id, e); if (failedMsgTimer != null) { failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 3312c98b64..6cf298adb2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -39,6 +39,10 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); + default void onRateLimit(RuleEngineException e) { + onFailure(e); + }; + /** * Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise. * message may no longer be valid, if the message pack is already expired/canceled/failed. From dfe21e3b006ece31737495fbedf1bdd86f6d48b9 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 09:47:54 +0200 Subject: [PATCH 08/23] ExceptionUtil moved to the common/util package --- .../common/util/ExceptionUtil.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java new file mode 100644 index 0000000000..fb08e21791 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.google.gson.JsonParseException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.EntityId; + +import javax.script.ScriptException; +import java.io.PrintWriter; +import java.io.StringWriter; + +@Slf4j +public class ExceptionUtil { + + @SuppressWarnings("unchecked") + public static T lookupException(Throwable source, Class clazz) { + Exception e = lookupExceptionInCause(source, clazz); + if (e != null) { + return (T) e; + } else { + return null; + } + } + + public static Exception lookupExceptionInCause(Throwable source, Class... clazzes) { + if (source == null) { + return null; + } + for (Class clazz : clazzes) { + if (clazz.isAssignableFrom(source.getClass())) { + return (Exception) source; + } + } + return lookupExceptionInCause(source.getCause(), clazzes); + } + + public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) { + Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class); + if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) { + return exception.getMessage(); + } else { + if (stackTraceEnabled) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } else { + log.debug("[{}] Unknown error during message processing", componentId, e); + return "Please contact system administrator"; + } + } + } +} From 859c820dc36ca51210330ef4e4f52fd59aa44057 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 09:51:11 +0200 Subject: [PATCH 09/23] AbstractRateLimitException introduced in common/data for all rate limit related exceptions --- .../exception/AbstractRateLimitException.java | 41 +++++++++++++++++++ .../ApiUsageLimitsExceededException.java | 2 +- .../msg/tools/TbRateLimitsException.java | 3 +- 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java new file mode 100644 index 0000000000..6faf2d7d8c --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.exception; + +public abstract class AbstractRateLimitException extends RuntimeException { + + public AbstractRateLimitException() { + super(); + } + + public AbstractRateLimitException(String message) { + super(message); + } + + public AbstractRateLimitException(String message, Throwable cause) { + super(message, cause); + } + + public AbstractRateLimitException(Throwable cause) { + super(cause); + } + + protected AbstractRateLimitException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java index 2d24184a3f..aa9441c776 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.common.data.exception; -public class ApiUsageLimitsExceededException extends RuntimeException { +public class ApiUsageLimitsExceededException extends AbstractRateLimitException { public ApiUsageLimitsExceededException(String message) { super(message); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java index dd5fda4dd5..5e63cb037e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java @@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools; import lombok.Getter; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.exception.AbstractRateLimitException; /** * Created by ashvayka on 22.10.18. */ -public class TbRateLimitsException extends RuntimeException { +public class TbRateLimitsException extends AbstractRateLimitException { @Getter private final EntityType entityType; From 825eaf640c2c3fa56d5cfdbb68fa5faf47ecf4b8 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 15:21:58 +0200 Subject: [PATCH 10/23] RuleEngineException cause added to be able to analyse cause by rate limit exceptions --- .../actors/ruleChain/RuleChainActorMessageProcessor.java | 4 ++-- .../service/queue/DefaultTbRuleEngineConsumerService.java | 2 +- .../server/common/msg/queue/RuleEngineException.java | 5 +++++ .../server/queue/common/MultipleTbQueueCallbackWrapper.java | 2 +- .../queue/common/MultipleTbQueueTbMsgCallbackWrapper.java | 2 +- .../server/queue/common/TbQueueTbMsgCallbackWrapper.java | 2 +- .../rule/engine/transform/MultipleTbMsgsCallbackWrapper.java | 2 +- 7 files changed, 12 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 4bd082ee38..4b868dfcbf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor Date: Thu, 27 Jul 2023 15:25:56 +0200 Subject: [PATCH 11/23] TbMsgPackCallback: AbstractRateLimitException lookup in cause of RuleEngineException. Tests added --- .../service/queue/TbMsgPackCallback.java | 6 +- .../service/queue/TbMsgPackCallbackTest.java | 98 +++++++++++++++++++ 2 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index 66364406c2..c2171c2550 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -17,7 +17,8 @@ package org.thingsboard.server.service.queue; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; +import org.thingsboard.common.util.ExceptionUtil; +import org.thingsboard.server.common.data.exception.AbstractRateLimitException; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -71,8 +72,7 @@ public class TbMsgPackCallback implements TbMsgCallback { @Override public void onFailure(RuleEngineException e) { - Throwable cause = e.getCause(); - if (cause instanceof TbRateLimitsException || cause instanceof ApiUsageLimitsExceededException) { + if (ExceptionUtil.lookupExceptionInCause(e, AbstractRateLimitException.class) != null) { onRateLimit(e); return; } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java new file mode 100644 index 0000000000..731ca75f17 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java @@ -0,0 +1,98 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.RuleNodeException; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; + +import java.util.UUID; +import java.util.stream.Stream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class TbMsgPackCallbackTest { + + TenantId tenantId; + UUID msgId; + TbMsgPackProcessingContext ctx; + TbMsgPackCallback callback; + + @BeforeEach + void setUp() { + tenantId = TenantId.fromUUID(UUID.randomUUID()); + msgId = UUID.randomUUID(); + ctx = mock(TbMsgPackProcessingContext.class); + callback = spy(new TbMsgPackCallback(msgId, tenantId, ctx)); + } + + private static Stream testOnFailure_NotRateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("rule engine no cause")), + Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))), + Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode())) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_NotRateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback, never()).onRateLimit(any()); + verify(callback, never()).onSuccess(); + verify(ctx, never()).onSuccess(any()); + } + + private static Stream testOnFailure_RateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))), + Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))), + Arguments.of( + new RuleEngineException("caused lvl 3", + new RuntimeException( + new Exception( + new TbRateLimitsException(EntityType.ASSET))))) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_RateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback).onRateLimit(any()); + verify(callback).onFailure(any()); + verify(callback, never()).onSuccess(); + verify(ctx).onSuccess(msgId); + verify(ctx).onSuccess(any()); + verify(ctx, never()).onFailure(any(), any(), any()); + } + +} From e75307c2beb9237b6d36fa7a1fb986aaa1e81316 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 18:52:04 +0200 Subject: [PATCH 12/23] ExceptionUtil.lookupExceptionInCause refactored from recursion to a loop. Tests added --- .../common/util/ExceptionUtil.java | 14 ++-- .../common/util/ExceptionUtilTest.java | 74 +++++++++++++++++++ 2 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java index fb08e21791..2ec15f3724 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java @@ -38,15 +38,15 @@ public class ExceptionUtil { } public static Exception lookupExceptionInCause(Throwable source, Class... clazzes) { - if (source == null) { - return null; - } - for (Class clazz : clazzes) { - if (clazz.isAssignableFrom(source.getClass())) { - return (Exception) source; + while (source != null) { + for (Class clazz : clazzes) { + if (clazz.isAssignableFrom(source.getClass())) { + return (Exception) source; + } } + source = source.getCause(); } - return lookupExceptionInCause(source.getCause(), clazzes); + return null; } public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) { diff --git a/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java new file mode 100644 index 0000000000..e589ae8e30 --- /dev/null +++ b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExceptionUtilTest { + + final Exception cause = new RuntimeException(); + + @Test + void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() { + Exception e = cause; + for (int i = 0; i <= 16384; i++) { + e = new Exception(e); + } + assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenCause_whenLookupExceptionInCause_thenReturnCause() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() { + assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCause_whenLookupExceptionInCause_thenReturnNull() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull(); + } + + @Test + void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() { + final Exception cause = new IOException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull(); + } + + @Test + void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() { + final Exception causeIAE = new IllegalAccessException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE); + + final Exception causeIOE = new IOException(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE); + } + +} From 4f99d75e15fb5323d5d1e9e994159cd27379312e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 18:52:29 +0200 Subject: [PATCH 13/23] license:format for new classes --- .../server/service/queue/TbMsgPackCallbackTest.java | 8 ++++---- .../common/data/exception/AbstractRateLimitException.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java index 731ca75f17..80b9535af3 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2023 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java index 6faf2d7d8c..1d1da75da3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2023 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From 161d7c3eda40464ec2c318f89c54f18741e78081 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 31 Jul 2023 11:59:31 +0200 Subject: [PATCH 14/23] Cassandra: unlimited cached thread pool replaced with limited pool to prevent stack memory peaks. New env CASSANDRA_QUERY_RESULT_PROCESSING_THREADS introduced. --- application/src/main/resources/thingsboard.yml | 3 ++- .../server/dao/nosql/CassandraAbstractAsyncDao.java | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 589540096c..3666678561 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -245,7 +245,8 @@ cassandra: concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}" - callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" + callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" # Buffered rate executor (read, write), for managing I/O rate. See "nosql-*-callback" threads in JMX + result_processing_threads: "${CASSANDRA_QUERY_RESULT_PROCESSING_THREADS:50}" # Result set transformer and processing. See "cassandra-callback" threads in JMX poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" # set all data types values except target to null for the same ts on save diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java index 425724bf2c..92289b7178 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java @@ -19,13 +19,13 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.ThingsBoardExecutors; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Created by ashvayka on 21.02.17. @@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { protected ExecutorService readResultsProcessingExecutor; + @Value("${cassandra.query.result_processing_threads:50}") + private int threadPoolSize; + @PostConstruct public void startExecutor() { - readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback")); + readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback"); } @PreDestroy From 9e92d10dc544b0ecac01f615b8861a295e0929f4 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 31 Jul 2023 12:05:23 +0200 Subject: [PATCH 15/23] Cassandra: AbstractBufferedRateExecutor improved logging to clarify whether the Read or Write operations logged --- .../dao/util/AbstractBufferedRateExecutor.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 9fbf796eb0..daade9acf3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -124,7 +124,7 @@ public abstract class AbstractBufferedRateExecutor new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration()) @@ -173,7 +173,7 @@ public abstract class AbstractBufferedRateExecutor taskCtx = null; @@ -185,7 +185,7 @@ public abstract class AbstractBufferedRateExecutor= printQueriesFreq) { printQueriesIdx.set(0); String query = queryToString(finalTaskCtx); - log.info("[{}] Cassandra query: {}", taskCtx.getId(), query); + log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query); } } logTask("Processing", finalTaskCtx); @@ -238,7 +238,7 @@ public abstract class AbstractBufferedRateExecutor taskCtx) { @@ -314,7 +314,7 @@ public abstract class AbstractBufferedRateExecutor Date: Fri, 16 Jun 2023 15:40:29 +0200 Subject: [PATCH 16/23] added recalculetePartitions delay for node restart --- .../src/main/resources/thingsboard.yml | 1 + .../queue/discovery/ZkDiscoveryService.java | 31 ++++++++++++++++++- .../src/main/resources/tb-vc-executor.yml | 1 + .../src/main/resources/tb-coap-transport.yml | 1 + .../src/main/resources/tb-http-transport.yml | 1 + .../src/main/resources/tb-lwm2m-transport.yml | 1 + .../src/main/resources/tb-mqtt-transport.yml | 1 + .../src/main/resources/tb-snmp-transport.yml | 1 + 8 files changed, 37 insertions(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 3666678561..19804c0588 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,6 +96,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index fcf80bcf3d..17d046a4cb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -44,8 +44,10 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -66,6 +68,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; + @Value("${zk.recalculate_delay:120000}") + private Long recalculateDelay; + + private final ConcurrentHashMap> delayedTasks; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; @@ -82,6 +88,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi PartitionService partitionService) { this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; + delayedTasks = new ConcurrentHashMap<>(); } @PostConstruct @@ -290,8 +297,30 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: + ScheduledFuture task = delayedTasks.remove(instance.getServiceId()); + if (task != null) { + if (!task.cancel(false)) { + log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + recalculatePartitions(); + } else { + log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + } + } else { + log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + recalculatePartitions(); + } + break; case CHILD_REMOVED: - recalculatePartitions(); + ScheduledFuture future = zkExecutorService.schedule(() -> { + log.debug("[{}] Going to recalculate partitions due to removed node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + delayedTasks.remove(instance.getServiceId()); + recalculatePartitions(); + }, recalculateDelay, TimeUnit.MILLISECONDS); + delayedTasks.put(instance.getServiceId(), future); break; default: break; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 352f94e091..0dbb19a71a 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 7ea553fe5c..c8f4b5a099 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 346ec48eae..fe181f12f2 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,6 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 4e8167d89d..d80279f582 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 1e0b1ebcd4..fcbf542287 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 9f086bcbc5..0e84d54fce 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" From ce9552e1a8ca44f58a369051bfc9f5bc24ca1477 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 6 Jul 2023 13:31:25 +0200 Subject: [PATCH 17/23] improvements --- .../queue/discovery/ZkDiscoveryService.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 17d046a4cb..24a7863b24 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -299,16 +299,16 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi case CHILD_ADDED: ScheduledFuture task = delayedTasks.remove(instance.getServiceId()); if (task != null) { - if (!task.cancel(false)) { - log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + if (task.cancel(false)) { + log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", + instance.getServiceId(), instance.getServiceTypesList()); + } else { + log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", instance.getServiceId(), instance.getServiceTypesList()); recalculatePartitions(); - } else { - log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]", - instance.getServiceId(), instance.getServiceTypesList()); } } else { - log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + log.debug("[{}] Going to recalculate partitions due to adding new node [{}].", instance.getServiceId(), instance.getServiceTypesList()); recalculatePartitions(); } @@ -317,8 +317,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", instance.getServiceId(), instance.getServiceTypesList()); - delayedTasks.remove(instance.getServiceId()); - recalculatePartitions(); + ScheduledFuture removedTask = delayedTasks.remove(instance.getServiceId()); + if (removedTask != null) { + recalculatePartitions(); + } }, recalculateDelay, TimeUnit.MILLISECONDS); delayedTasks.put(instance.getServiceId(), future); break; @@ -332,6 +334,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi * Synchronized to ensure that other servers info is up to date * */ synchronized void recalculatePartitions() { + delayedTasks.clear(); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); } From 948f517898ff2207e6ba797e83ca2f77a3194790 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 14 Jul 2023 19:45:23 +0200 Subject: [PATCH 18/23] added zk restart node tests --- .../queue/discovery/ZkDiscoveryService.java | 2 +- .../discovery/ZkDiscoveryServiceTest.java | 173 ++++++++++++++++++ 2 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 24a7863b24..50378d3387 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -71,7 +71,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Value("${zk.recalculate_delay:120000}") private Long recalculateDelay; - private final ConcurrentHashMap> delayedTasks; + protected final ConcurrentHashMap> delayedTasks; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java new file mode 100644 index 0000000000..38cad217aa --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -0,0 +1,173 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED; +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ZkDiscoveryServiceTest { + + @Mock + private TbServiceInfoProvider serviceInfoProvider; + + @Mock + private PartitionService partitionService; + + @Mock + private CuratorFramework client; + + @Mock + private PathChildrenCache cache; + + private ScheduledExecutorService zkExecutorService; + + @Mock + private CuratorFramework curatorFramework; + + private ZkDiscoveryService zkDiscoveryService; + + @Before + public void setup() { + zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); + zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); + when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); + ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); + ReflectionTestUtils.setField(zkDiscoveryService, "client", client); + ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); + ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); + ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); + ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", 1000L); + ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); + } + + @Test + public void restartNodeTest() throws Exception { + var currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("currentId").build(); + var currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); + var childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("childId").build(); + var childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); + + when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); + List dataList = new ArrayList<>(); + dataList.add(currentData); + when(cache.getCurrentData()).thenReturn(dataList); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + + //Restart in timeAssert.assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + + startNode(childData); + + verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + + Thread.sleep(2000); + + verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + + //Restart not in time + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + Thread.sleep(2000); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(Collections.emptyList())); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + + //Start another node during restart + var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("anotherId").build(); + var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); + + stopNode(childData); + + assertEquals(1, zkDiscoveryService.delayedTasks.size()); + + startNode(anotherData); + + assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); + reset(partitionService); + + Thread.sleep(2000); + + verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo))); + } + + private void startNode(ChildData data) throws Exception { + cache.getCurrentData().add(data); + zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data)); + } + + private void stopNode(ChildData data) throws Exception { + cache.getCurrentData().remove(data); + zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data)); + } + +} From ac2aac8aa7a264e8ff9452714818cd1dfcc9ba00 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 28 Jul 2023 14:20:33 +0200 Subject: [PATCH 19/23] refactored due to comments --- .../src/main/resources/thingsboard.yml | 2 +- .../queue/discovery/ZkDiscoveryService.java | 26 +++++--- .../discovery/ZkDiscoveryServiceTest.java | 62 ++++++++++++------- .../src/main/resources/tb-vc-executor.yml | 2 +- .../src/main/resources/tb-coap-transport.yml | 2 +- .../src/main/resources/tb-http-transport.yml | 2 +- .../src/main/resources/tb-lwm2m-transport.yml | 2 +- .../src/main/resources/tb-mqtt-transport.yml | 2 +- .../src/main/resources/tb-snmp-transport.yml | 2 +- 9 files changed, 62 insertions(+), 40 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 19804c0588..1f16fbc414 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,7 +96,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 50378d3387..44999d016a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.discovery; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ProtocolStringList; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -68,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; - @Value("${zk.recalculate_delay:120000}") + @Value("${zk.recalculate_delay:60000}") private Long recalculateDelay; protected final ConcurrentHashMap> delayedTasks; @@ -294,35 +295,39 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.error("Failed to decode server instance for node {}", data.getPath(), e); throw e; } - log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); + + String serviceId = instance.getServiceId(); + ProtocolStringList serviceTypesList = instance.getServiceTypesList(); + + log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: - ScheduledFuture task = delayedTasks.remove(instance.getServiceId()); + ScheduledFuture task = delayedTasks.remove(serviceId); if (task != null) { if (task.cancel(false)) { log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", - instance.getServiceId(), instance.getServiceTypesList()); + serviceId, serviceTypesList); } else { log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", - instance.getServiceId(), instance.getServiceTypesList()); + serviceId, serviceTypesList); recalculatePartitions(); } } else { - log.debug("[{}] Going to recalculate partitions due to adding new node [{}].", - instance.getServiceId(), instance.getServiceTypesList()); + log.trace("[{}] Going to recalculate partitions due to adding new node [{}].", + serviceId, serviceTypesList); recalculatePartitions(); } break; case CHILD_REMOVED: ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", - instance.getServiceId(), instance.getServiceTypesList()); - ScheduledFuture removedTask = delayedTasks.remove(instance.getServiceId()); + serviceId, serviceTypesList); + ScheduledFuture removedTask = delayedTasks.remove(serviceId); if (removedTask != null) { recalculatePartitions(); } }, recalculateDelay, TimeUnit.MILLISECONDS); - delayedTasks.put(instance.getServiceId(), future); + delayedTasks.put(serviceId, future); break; default: break; @@ -334,6 +339,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi * Synchronized to ensure that other servers info is up to date * */ synchronized void recalculatePartitions() { + delayedTasks.values().forEach(future -> future.cancel(false)); delayedTasks.clear(); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index 38cad217aa..a8810efd0e 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -63,68 +63,76 @@ public class ZkDiscoveryServiceTest { @Mock private PathChildrenCache cache; - private ScheduledExecutorService zkExecutorService; - @Mock private CuratorFramework curatorFramework; private ZkDiscoveryService zkDiscoveryService; + private static final long RECALCULATE_DELAY = 100L; + + final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build(); + final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); + final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build(); + final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); + @Before public void setup() { zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); - zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); + ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); ReflectionTestUtils.setField(zkDiscoveryService, "client", client); ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); - ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", 1000L); + ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY); ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); - } - - @Test - public void restartNodeTest() throws Exception { - var currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("currentId").build(); - var currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); - var childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("childId").build(); - var childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); + List dataList = new ArrayList<>(); dataList.add(currentData); when(cache.getCurrentData()).thenReturn(dataList); + } + @Test + public void restartNodeInTimeTest() throws Exception { startNode(childData); verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); reset(partitionService); - //Restart in timeAssert.assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); startNode(childData); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + } + + @Test + public void restartNodeNotInTimeTest() throws Exception { + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); - //Restart not in time stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); @@ -135,11 +143,19 @@ public class ZkDiscoveryServiceTest { verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); reset(partitionService); + } - //Start another node during restart - var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("anotherId").build(); + @Test + public void startAnotherNodeDuringRestartTest() throws Exception { + var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build(); var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); @@ -151,9 +167,9 @@ public class ZkDiscoveryServiceTest { verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); reset(partitionService); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); startNode(childData); diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 0dbb19a71a..66c6b4d3da 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index c8f4b5a099..f4b5e0bc94 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index fe181f12f2..f92da86b99 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,7 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index d80279f582..05388473f0 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index fcbf542287..e131788929 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 0e84d54fce..a7928eb49f 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" From fe7846d1520ae4d06311ebcf6b54161a814f024b Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 19 Aug 2021 15:23:16 +0200 Subject: [PATCH 20/23] ui: event table: default interval is 15 minutes --- .../src/app/modules/home/components/event/event-table-config.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index a9816a130c..8b0ea62abb 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -40,6 +40,7 @@ import { EventContentDialogData } from '@home/components/event/event-content-dialog.component'; import { isEqual, sortObjectKeys } from '@core/utils'; +import {historyInterval, MINUTE} from '@shared/models/time/time.models'; import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay'; import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core'; import { ComponentPortal } from '@angular/cdk/portal'; @@ -89,6 +90,7 @@ export class EventTableConfig extends EntityTableConfig { this.loadDataOnInit = false; this.tableTitle = ''; this.useTimePageLink = true; + this.defaultTimewindowInterval = historyInterval(MINUTE * 15); this.detailsPanelEnabled = false; this.selectionEnabled = false; this.searchEnabled = false; From 33bab60954e67220e278bf4399daf440d464efca Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 19 Aug 2021 15:22:46 +0200 Subject: [PATCH 21/23] ui: event table: ts with ms --- .../src/app/modules/home/components/event/event-table-config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index 8b0ea62abb..a07967f9c3 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -178,7 +178,7 @@ export class EventTableConfig extends EntityTableConfig { updateColumns(updateTableColumns: boolean = false): void { this.columns = []; this.columns.push( - new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px'), + new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px', 'yyyy-MM-dd HH:mm:ss.SSS'), new EntityTableColumn('server', 'event.server', '100px', (entity) => entity.body.server, entity => ({}), false)); switch (this.eventType) { From b7d522295810b59201614f3df28fc59eabc18e0d Mon Sep 17 00:00:00 2001 From: Vladyslav Date: Tue, 8 Aug 2023 17:16:02 +0300 Subject: [PATCH 22/23] UI: Updated code style --- .../src/app/modules/home/components/event/event-table-config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index a07967f9c3..eb3edb0baf 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -40,7 +40,7 @@ import { EventContentDialogData } from '@home/components/event/event-content-dialog.component'; import { isEqual, sortObjectKeys } from '@core/utils'; -import {historyInterval, MINUTE} from '@shared/models/time/time.models'; +import { historyInterval, MINUTE } from '@shared/models/time/time.models'; import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay'; import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core'; import { ComponentPortal } from '@angular/cdk/portal'; From 141a7ff0e6be9bb132e788c870b6689f84fb8b5b Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 9 Aug 2023 21:21:15 +0200 Subject: [PATCH 23/23] changed recalculate_delay --- application/src/main/resources/thingsboard.yml | 5 ++++- .../server/queue/discovery/ZkDiscoveryService.java | 2 +- msa/vc-executor/src/main/resources/tb-vc-executor.yml | 2 +- transport/coap/src/main/resources/tb-coap-transport.yml | 2 +- transport/http/src/main/resources/tb-http-transport.yml | 2 +- transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml | 2 +- transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 2 +- transport/snmp/src/main/resources/tb-snmp-transport.yml | 2 +- 8 files changed, 11 insertions(+), 8 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1f16fbc414..5b76175fcd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,7 +96,10 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + # The recalculate_delay property recommended in a microservices architecture setup for rule-engine services. + # This property provides a pause to ensure that when a rule-engine service is restarted, other nodes don't immediately attempt to recalculate their partitions. + # The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability. + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 44999d016a..e99817de17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -69,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; - @Value("${zk.recalculate_delay:60000}") + @Value("${zk.recalculate_delay:0}") private Long recalculateDelay; protected final ConcurrentHashMap> delayedTasks; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 66c6b4d3da..9e57a35e20 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f4b5e0bc94..a545759f38 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index f92da86b99..1f042fb131 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,7 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 05388473f0..ffe815d441 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e131788929..f7d0209804 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index a7928eb49f..3ed46dde78 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}"