From 7e27c5b6833a725c4d0c4e524b3c483d95001451 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 13 Jun 2023 18:17:25 +0200 Subject: [PATCH 01/19] mqtt-client: messages processing moved from netty event loop pool and to the handlerExecutor to make netty handlers non-blocking --- .../msa/connectivity/MqttClientTest.java | 16 ++- .../connectivity/MqttGatewayClientTest.java | 16 ++- netty-mqtt/pom.xml | 4 + .../thingsboard/mqtt/MqttChannelHandler.java | 113 +++++++++++++----- .../java/org/thingsboard/mqtt/MqttClient.java | 7 +- .../org/thingsboard/mqtt/MqttClientImpl.java | 15 ++- .../thingsboard/mqtt/MqttSubscription.java | 2 +- .../mqtt/integration/MqttIntegrationTest.java | 16 ++- .../rule/engine/mqtt/TbMqttNode.java | 2 +- 9 files changed, 153 insertions(+), 38 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 893c8b565c..96e57549a8 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; @@ -74,8 +75,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi public class MqttClientTest extends AbstractContainerTest { private Device device; + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void setUp() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); device = testRestClient.postDevice("", defaultDevicePrototype("http_")); } @@ -83,6 +94,9 @@ public class MqttClientTest extends AbstractContainerTest { @AfterMethod public void tearDown() { testRestClient.deleteDeviceIfExists(device.getId()); + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test public void telemetryUpload() throws Exception { @@ -465,7 +479,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index a038d4cf50..8cddb69fa3 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -32,6 +32,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; @@ -76,8 +77,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest { private MqttMessageListener listener; private JsonParser jsonParser = new JsonParser(); + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void createGateway() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype()); DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); @@ -94,6 +105,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest { this.listener = null; this.mqttClient = null; this.createdDevice = null; + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test @@ -407,7 +421,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml index 60883f4b34..400b486e18 100644 --- a/netty-mqtt/pom.xml +++ b/netty-mqtt/pom.xml @@ -35,6 +35,10 @@ + + org.thingsboard.common + util + io.netty netty-codec-mqtt diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index e243f66633..6b3a4e009e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -16,6 +16,10 @@ package org.thingsboard.mqtt; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j final class MqttChannelHandler extends SimpleChannelInboundHandler { private final MqttClientImpl client; @@ -110,27 +121,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler super.channelInactive(ctx); } - private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { - boolean handlerInvoked = false; - for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { - if (subscription.matches(message.variableHeader().topicName())) { - if (subscription.isOnce() && subscription.isCalled()) { - continue; + ListenableFuture invokeHandlersForIncomingPublish(MqttPublishMessage message) { + var future = Futures.immediateVoidFuture(); + var handlerInvoked = new AtomicBoolean(); + try { + for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { + if (subscription.matches(message.variableHeader().topicName())) { + future = Futures.transform(future, x -> { + if (subscription.isOnce() && subscription.isCalled()) { + return null; + } + message.payload().markReaderIndex(); + subscription.setCalled(true); + subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if (subscription.isOnce()) { + this.client.off(subscription.getTopic(), subscription.getHandler()); + } + message.payload().resetReaderIndex(); + handlerInvoked.set(true); + return null; + }, client.getHandlerExecutor()); } - message.payload().markReaderIndex(); - subscription.setCalled(true); - subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); - if (subscription.isOnce()) { - this.client.off(subscription.getTopic(), subscription.getHandler()); - } - message.payload().resetReaderIndex(); - handlerInvoked = true; } + future = Futures.transform(future, x -> { + if (!handlerInvoked.get() && client.getDefaultHandler() != null) { + client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); + } + return null; + }, client.getHandlerExecutor()); + } finally { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void result) { + message.payload().release(); + } + + @Override + public void onFailure(Throwable t) { + message.payload().release(); + } + }, MoreExecutors.directExecutor()); } - if (!handlerInvoked && client.getDefaultHandler() != null) { - client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); - } - message.payload().release(); + return future; } private void handleConack(Channel channel, MqttConnAckMessage message) { @@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler break; case AT_LEAST_ONCE: - invokeHandlersForIncomingPublish(message); + var future = invokeHandlersForIncomingPublish(message); if (message.variableHeader().packetId() != -1) { - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); - channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } break; @@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler } private void handlePubrel(Channel channel, MqttMessage message) { + var future = Futures.immediateVoidFuture(); if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); + future = Futures.transform(future, x -> { + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + return null; + }, MoreExecutors.directExecutor()); } - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } private void handlePubcomp(MqttMessage message) { @@ -274,4 +314,23 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.getPayload().release(); pendingPublish.onPubcompReceived(); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + if (cause instanceof IOException) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause); + } else if (log.isInfoEnabled()) { + log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause.getMessage()); + } + } else { + log.warn("exceptionCaught", cause); + } + } finally { + ReferenceCountUtil.release(cause); + } + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java index 2fe179de31..536a76119f 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import org.thingsboard.common.util.ListeningExecutor; public interface MqttClient { @@ -71,6 +72,8 @@ public interface MqttClient { */ void setEventLoop(EventLoopGroup eventLoop); + ListeningExecutor getHandlerExecutor(); + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * @@ -180,8 +183,8 @@ public interface MqttClient { * @param config The config object to use while looking for settings * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions */ - static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){ - return new MqttClientImpl(config, defaultHandler); + static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){ + return new MqttClientImpl(config, defaultHandler, handlerExecutor); } /** diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index f38a790be7..63d65a1cc2 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ListeningExecutor; import java.util.Collections; import java.util.HashSet; @@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient { private int port; private MqttClientCallback callback; + private final ListeningExecutor handlerExecutor; /** * Construct the MqttClientImpl with default config */ - public MqttClientImpl(MqttHandler defaultHandler) { - this.clientConfig = new MqttClientConfig(); - this.defaultHandler = defaultHandler; + public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { + this(new MqttClientConfig(), defaultHandler, handlerExecutor); } /** @@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient { * * @param clientConfig The config object to use while looking for settings */ - public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) { + public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { this.clientConfig = clientConfig; this.defaultHandler = defaultHandler; + this.handlerExecutor = handlerExecutor; } /** @@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient { this.eventLoop = eventLoop; } + @Override + public ListeningExecutor getHandlerExecutor() { + return this.handlerExecutor; + } + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 6c4abb4c5c..c4bc9e38c1 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -25,7 +25,7 @@ final class MqttSubscription { private final boolean once; - private boolean called; + private volatile boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { if (topic == null) { diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index cb1b6b81fe..f39ca01110 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -26,6 +26,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; @@ -49,8 +50,18 @@ public class MqttIntegrationTest { MqttClient mqttClient; + AbstractListeningExecutor handlerExecutor; + @Before public void init() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + this.eventLoopGroup = new NioEventLoopGroup(); this.mqttServer = new MqttServer(); @@ -68,6 +79,9 @@ public class MqttIntegrationTest { if (this.eventLoopGroup != null) { this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } + if (this.handlerExecutor != null) { + this.handlerExecutor.destroy(); + } } @Test @@ -110,7 +124,7 @@ public class MqttIntegrationTest { MqttClientConfig config = new MqttClientConfig(); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, handlerExecutor); client.setEventLoop(this.eventLoopGroup); Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 121b9fb756..49b31cb33c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -114,7 +114,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; From d74e0c45df8442e709928c3c04efe36442782a07 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 20 Jun 2023 14:26:30 +0200 Subject: [PATCH 02/19] MqttHandler - processAsync (required for AbstractMqttIntegration) --- .../thingsboard/server/msa/connectivity/MqttClientTest.java | 4 +++- .../server/msa/connectivity/MqttGatewayClientTest.java | 4 +++- .../src/main/java/org/thingsboard/mqtt/MqttHandler.java | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 96e57549a8..940bd6777e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -493,9 +494,10 @@ public class MqttClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 8cddb69fa3..de11df2623 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -435,9 +436,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java index 0ec03ff04b..21c07a17cd 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java @@ -15,9 +15,10 @@ */ package org.thingsboard.mqtt; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; public interface MqttHandler { - void onMessage(String topic, ByteBuf payload); + ListenableFuture onMessage(String topic, ByteBuf payload); } From c3e9ab59918f04c5eb9d79df47948c187d09e7cf Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 26 Jul 2023 20:38:22 +0200 Subject: [PATCH 03/19] TbKafkaProducerTemplate will add headers for each message when log level: DEBUG - producerId and thread name; TRACE - stacktrace first 10-2=8 lines --- .../queue/kafka/TbKafkaProducerTemplate.java | 28 +++++++++- .../kafka/TbKafkaProducerTemplateTest.java | 54 +++++++++++++++++++ .../queue/src/test/resources/logback-test.xml | 20 +++++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java create mode 100644 common/queue/src/test/resources/logback-test.xml diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 15c2f04d17..7c1c28b9f5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate implements TbQueuePro private final Set topics; + @Getter + private final String clientId; + @Builder private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) { Properties props = settings.toProducerProps(); + this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null"); if (!StringUtils.isEmpty(clientId)) { props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); } @@ -72,6 +79,24 @@ public class TbKafkaProducerTemplate implements TbQueuePro public void init() { } + void addAnalyticHeaders(List
headers) { + try { + if (log.isDebugEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + } + if (log.isTraceEnabled()) { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int maxlevel = Math.min(stackTrace.length, 10); + for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders + headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); + } + } + } catch (Throwable t) { + log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); + } + } + @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { try { @@ -79,7 +104,8 @@ public class TbKafkaProducerTemplate implements TbQueuePro String key = msg.getKey().toString(); byte[] data = msg.getData(); ProducerRecord record; - Iterable
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + addAnalyticHeaders(headers); record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java new file mode 100644 index 0000000000..bfd3c4a6dc --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +@Slf4j +class TbKafkaProducerTemplateTest { + + TbKafkaProducerTemplate producerTemplate; + + @BeforeEach + void setUp() { + producerTemplate = mock(TbKafkaProducerTemplate.class); + willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any()); + willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId(); + } + + @Test + void testAddAnalyticHeaders() { + List
headers = new ArrayList<>(); + producerTemplate.addAnalyticHeaders(headers); + assertThat(headers).isNotEmpty(); + headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8))); + } + +} diff --git a/common/queue/src/test/resources/logback-test.xml b/common/queue/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..f7053313d4 --- /dev/null +++ b/common/queue/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + From 2cccd0951a491c8a63264a6982d2e3744e78aaeb Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 19:40:27 +0200 Subject: [PATCH 04/19] TbKafkaProducerTemplate: addAnalyticHeaders optimized --- .../queue/kafka/TbKafkaProducerTemplate.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 7c1c28b9f5..850f5c08ea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -80,20 +80,18 @@ public class TbKafkaProducerTemplate implements TbQueuePro } void addAnalyticHeaders(List
headers) { - try { - if (log.isDebugEnabled()) { - headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); - headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); - } - if (log.isTraceEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + if (log.isTraceEnabled()) { + try { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); int maxlevel = Math.min(stackTrace.length, 10); for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); } + } catch (Throwable t) { + log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t); } - } catch (Throwable t) { - log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); } } @@ -105,7 +103,9 @@ public class TbKafkaProducerTemplate implements TbQueuePro byte[] data = msg.getData(); ProducerRecord record; List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); - addAnalyticHeaders(headers); + if (log.isDebugEnabled()) { + addAnalyticHeaders(headers); + } record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { From ed6614af71a3c0d4c08c9772ed84f08e2e2c0ea3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 20:59:58 +0200 Subject: [PATCH 05/19] MqttClientConfig - ownerId added for exceptions logging purposes. MqttChannelHandler - improved logging --- .../server/msa/connectivity/MqttClientTest.java | 5 +++++ .../server/msa/connectivity/MqttGatewayClientTest.java | 5 +++++ .../java/org/thingsboard/mqtt/MqttChannelHandler.java | 10 ++++------ .../java/org/thingsboard/mqtt/MqttClientConfig.java | 6 +++++- .../mqtt/integration/MqttIntegrationTest.java | 1 + .../org/thingsboard/rule/engine/mqtt/TbMqttNode.java | 6 +++++- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 940bd6777e..5a7e9b631d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -476,8 +476,13 @@ public class MqttClientTest extends AbstractContainerTest { return getMqttClient(deviceCredentials.getCredentialsId(), listener); } + private String getOwnerId() { + return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]"; + } + private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index de11df2623..5e2a9f5fe6 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -418,8 +418,13 @@ public class MqttGatewayClientTest extends AbstractContainerTest { return testRestClient.getDeviceById(createdDeviceId); } + private String getOwnerId() { + return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]"; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index 6b3a4e009e..a40c30ef96 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -320,14 +320,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler try { if (cause instanceof IOException) { if (log.isDebugEnabled()) { - log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause); - } else if (log.isInfoEnabled()) { - log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause.getMessage()); + log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause); + } else { + log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage()); } } else { - log.warn("exceptionCaught", cause); + log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause); } } finally { ReferenceCountUtil.release(cause); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 1e20a842c3..10cee5d5fc 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; +import lombok.Getter; +import lombok.Setter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,10 +28,12 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { - private final SslContext sslContext; private final String randomClientId; + @Getter + @Setter + private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes private String clientId; private int timeoutSeconds = 60; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index f39ca01110..04c2be1740 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -122,6 +122,7 @@ public class MqttIntegrationTest { private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); + config.setOwnerId("MqttIntegrationTest"); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null, handlerExecutor); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 49b31cb33c..1376f29710 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode { } } + String getOwnerId(TbContext ctx) { + return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; + } + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); + config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId()); From 1b565e01ce5caa1c652758a8be82943806e04194 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 25 Jul 2023 06:54:48 +0200 Subject: [PATCH 06/19] Rule engine: ack all rate limited failures (draft) --- .../service/queue/TbMsgPackCallback.java | 18 ++++++++++++++++++ .../server/common/msg/queue/TbMsgCallback.java | 4 ++++ 2 files changed, 22 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index ef2ba8798d..66364406c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -17,11 +17,13 @@ package org.thingsboard.server.service.queue; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -57,8 +59,24 @@ public class TbMsgPackCallback implements TbMsgCallback { ctx.onSuccess(id); } + @Override + public void onRateLimit(RuleEngineException e) { + log.debug("[{}] ON RATE LIMIT", id, e); + //TODO notify tenant on rate limit + if (failedMsgTimer != null) { + failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); + } + ctx.onSuccess(id); + } + @Override public void onFailure(RuleEngineException e) { + Throwable cause = e.getCause(); + if (cause instanceof TbRateLimitsException || cause instanceof ApiUsageLimitsExceededException) { + onRateLimit(e); + return; + } + log.trace("[{}] ON FAILURE", id, e); if (failedMsgTimer != null) { failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 3312c98b64..6cf298adb2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -39,6 +39,10 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); + default void onRateLimit(RuleEngineException e) { + onFailure(e); + }; + /** * Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise. * message may no longer be valid, if the message pack is already expired/canceled/failed. From dfe21e3b006ece31737495fbedf1bdd86f6d48b9 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 09:47:54 +0200 Subject: [PATCH 07/19] ExceptionUtil moved to the common/util package --- .../common/util/ExceptionUtil.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java new file mode 100644 index 0000000000..fb08e21791 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.google.gson.JsonParseException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.EntityId; + +import javax.script.ScriptException; +import java.io.PrintWriter; +import java.io.StringWriter; + +@Slf4j +public class ExceptionUtil { + + @SuppressWarnings("unchecked") + public static T lookupException(Throwable source, Class clazz) { + Exception e = lookupExceptionInCause(source, clazz); + if (e != null) { + return (T) e; + } else { + return null; + } + } + + public static Exception lookupExceptionInCause(Throwable source, Class... clazzes) { + if (source == null) { + return null; + } + for (Class clazz : clazzes) { + if (clazz.isAssignableFrom(source.getClass())) { + return (Exception) source; + } + } + return lookupExceptionInCause(source.getCause(), clazzes); + } + + public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) { + Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class); + if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) { + return exception.getMessage(); + } else { + if (stackTraceEnabled) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } else { + log.debug("[{}] Unknown error during message processing", componentId, e); + return "Please contact system administrator"; + } + } + } +} From 859c820dc36ca51210330ef4e4f52fd59aa44057 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 09:51:11 +0200 Subject: [PATCH 08/19] AbstractRateLimitException introduced in common/data for all rate limit related exceptions --- .../exception/AbstractRateLimitException.java | 41 +++++++++++++++++++ .../ApiUsageLimitsExceededException.java | 2 +- .../msg/tools/TbRateLimitsException.java | 3 +- 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java new file mode 100644 index 0000000000..6faf2d7d8c --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.exception; + +public abstract class AbstractRateLimitException extends RuntimeException { + + public AbstractRateLimitException() { + super(); + } + + public AbstractRateLimitException(String message) { + super(message); + } + + public AbstractRateLimitException(String message, Throwable cause) { + super(message, cause); + } + + public AbstractRateLimitException(Throwable cause) { + super(cause); + } + + protected AbstractRateLimitException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java index 2d24184a3f..aa9441c776 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ApiUsageLimitsExceededException.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.common.data.exception; -public class ApiUsageLimitsExceededException extends RuntimeException { +public class ApiUsageLimitsExceededException extends AbstractRateLimitException { public ApiUsageLimitsExceededException(String message) { super(message); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java index dd5fda4dd5..5e63cb037e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java @@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools; import lombok.Getter; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.exception.AbstractRateLimitException; /** * Created by ashvayka on 22.10.18. */ -public class TbRateLimitsException extends RuntimeException { +public class TbRateLimitsException extends AbstractRateLimitException { @Getter private final EntityType entityType; From 825eaf640c2c3fa56d5cfdbb68fa5faf47ecf4b8 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 15:21:58 +0200 Subject: [PATCH 09/19] RuleEngineException cause added to be able to analyse cause by rate limit exceptions --- .../actors/ruleChain/RuleChainActorMessageProcessor.java | 4 ++-- .../service/queue/DefaultTbRuleEngineConsumerService.java | 2 +- .../server/common/msg/queue/RuleEngineException.java | 5 +++++ .../server/queue/common/MultipleTbQueueCallbackWrapper.java | 2 +- .../queue/common/MultipleTbQueueTbMsgCallbackWrapper.java | 2 +- .../server/queue/common/TbQueueTbMsgCallbackWrapper.java | 2 +- .../rule/engine/transform/MultipleTbMsgsCallbackWrapper.java | 2 +- 7 files changed, 12 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 4bd082ee38..4b868dfcbf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor Date: Thu, 27 Jul 2023 15:25:56 +0200 Subject: [PATCH 10/19] TbMsgPackCallback: AbstractRateLimitException lookup in cause of RuleEngineException. Tests added --- .../service/queue/TbMsgPackCallback.java | 6 +- .../service/queue/TbMsgPackCallbackTest.java | 98 +++++++++++++++++++ 2 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index 66364406c2..c2171c2550 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -17,7 +17,8 @@ package org.thingsboard.server.service.queue; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; +import org.thingsboard.common.util.ExceptionUtil; +import org.thingsboard.server.common.data.exception.AbstractRateLimitException; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -71,8 +72,7 @@ public class TbMsgPackCallback implements TbMsgCallback { @Override public void onFailure(RuleEngineException e) { - Throwable cause = e.getCause(); - if (cause instanceof TbRateLimitsException || cause instanceof ApiUsageLimitsExceededException) { + if (ExceptionUtil.lookupExceptionInCause(e, AbstractRateLimitException.class) != null) { onRateLimit(e); return; } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java new file mode 100644 index 0000000000..731ca75f17 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java @@ -0,0 +1,98 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.RuleNodeException; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; + +import java.util.UUID; +import java.util.stream.Stream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class TbMsgPackCallbackTest { + + TenantId tenantId; + UUID msgId; + TbMsgPackProcessingContext ctx; + TbMsgPackCallback callback; + + @BeforeEach + void setUp() { + tenantId = TenantId.fromUUID(UUID.randomUUID()); + msgId = UUID.randomUUID(); + ctx = mock(TbMsgPackProcessingContext.class); + callback = spy(new TbMsgPackCallback(msgId, tenantId, ctx)); + } + + private static Stream testOnFailure_NotRateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("rule engine no cause")), + Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))), + Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))), + Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode())) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_NotRateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback, never()).onRateLimit(any()); + verify(callback, never()).onSuccess(); + verify(ctx, never()).onSuccess(any()); + } + + private static Stream testOnFailure_RateLimitException() { + return Stream.of( + Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))), + Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))), + Arguments.of( + new RuleEngineException("caused lvl 3", + new RuntimeException( + new Exception( + new TbRateLimitsException(EntityType.ASSET))))) + ); + } + + @ParameterizedTest + @MethodSource + void testOnFailure_RateLimitException(RuleEngineException ree) { + callback.onFailure(ree); + + verify(callback).onRateLimit(any()); + verify(callback).onFailure(any()); + verify(callback, never()).onSuccess(); + verify(ctx).onSuccess(msgId); + verify(ctx).onSuccess(any()); + verify(ctx, never()).onFailure(any(), any(), any()); + } + +} From e75307c2beb9237b6d36fa7a1fb986aaa1e81316 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 18:52:04 +0200 Subject: [PATCH 11/19] ExceptionUtil.lookupExceptionInCause refactored from recursion to a loop. Tests added --- .../common/util/ExceptionUtil.java | 14 ++-- .../common/util/ExceptionUtilTest.java | 74 +++++++++++++++++++ 2 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java index fb08e21791..2ec15f3724 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ExceptionUtil.java @@ -38,15 +38,15 @@ public class ExceptionUtil { } public static Exception lookupExceptionInCause(Throwable source, Class... clazzes) { - if (source == null) { - return null; - } - for (Class clazz : clazzes) { - if (clazz.isAssignableFrom(source.getClass())) { - return (Exception) source; + while (source != null) { + for (Class clazz : clazzes) { + if (clazz.isAssignableFrom(source.getClass())) { + return (Exception) source; + } } + source = source.getCause(); } - return lookupExceptionInCause(source.getCause(), clazzes); + return null; } public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) { diff --git a/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java new file mode 100644 index 0000000000..e589ae8e30 --- /dev/null +++ b/common/util/src/test/java/org/thingsboard/common/util/ExceptionUtilTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExceptionUtilTest { + + final Exception cause = new RuntimeException(); + + @Test + void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() { + Exception e = cause; + for (int i = 0; i <= 16384; i++) { + e = new Exception(e); + } + assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenCause_whenLookupExceptionInCause_thenReturnCause() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() { + assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause); + } + + @Test + void givenNoCause_whenLookupExceptionInCause_thenReturnNull() { + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull(); + } + + @Test + void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() { + final Exception cause = new IOException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull(); + } + + @Test + void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() { + final Exception causeIAE = new IllegalAccessException(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE); + + final Exception causeIOE = new IOException(causeIAE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull(); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE); + assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE); + } + +} From 4f99d75e15fb5323d5d1e9e994159cd27379312e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 18:52:29 +0200 Subject: [PATCH 12/19] license:format for new classes --- .../server/service/queue/TbMsgPackCallbackTest.java | 8 ++++---- .../common/data/exception/AbstractRateLimitException.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java index 731ca75f17..80b9535af3 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackCallbackTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2023 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java index 6faf2d7d8c..1d1da75da3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/AbstractRateLimitException.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2023 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From fe7846d1520ae4d06311ebcf6b54161a814f024b Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 19 Aug 2021 15:23:16 +0200 Subject: [PATCH 13/19] ui: event table: default interval is 15 minutes --- .../src/app/modules/home/components/event/event-table-config.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index a9816a130c..8b0ea62abb 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -40,6 +40,7 @@ import { EventContentDialogData } from '@home/components/event/event-content-dialog.component'; import { isEqual, sortObjectKeys } from '@core/utils'; +import {historyInterval, MINUTE} from '@shared/models/time/time.models'; import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay'; import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core'; import { ComponentPortal } from '@angular/cdk/portal'; @@ -89,6 +90,7 @@ export class EventTableConfig extends EntityTableConfig { this.loadDataOnInit = false; this.tableTitle = ''; this.useTimePageLink = true; + this.defaultTimewindowInterval = historyInterval(MINUTE * 15); this.detailsPanelEnabled = false; this.selectionEnabled = false; this.searchEnabled = false; From 33bab60954e67220e278bf4399daf440d464efca Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 19 Aug 2021 15:22:46 +0200 Subject: [PATCH 14/19] ui: event table: ts with ms --- .../src/app/modules/home/components/event/event-table-config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index 8b0ea62abb..a07967f9c3 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -178,7 +178,7 @@ export class EventTableConfig extends EntityTableConfig { updateColumns(updateTableColumns: boolean = false): void { this.columns = []; this.columns.push( - new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px'), + new DateEntityTableColumn('createdTime', 'event.event-time', this.datePipe, '120px', 'yyyy-MM-dd HH:mm:ss.SSS'), new EntityTableColumn('server', 'event.server', '100px', (entity) => entity.body.server, entity => ({}), false)); switch (this.eventType) { From b7d522295810b59201614f3df28fc59eabc18e0d Mon Sep 17 00:00:00 2001 From: Vladyslav Date: Tue, 8 Aug 2023 17:16:02 +0300 Subject: [PATCH 15/19] UI: Updated code style --- .../src/app/modules/home/components/event/event-table-config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts index a07967f9c3..eb3edb0baf 100644 --- a/ui-ngx/src/app/modules/home/components/event/event-table-config.ts +++ b/ui-ngx/src/app/modules/home/components/event/event-table-config.ts @@ -40,7 +40,7 @@ import { EventContentDialogData } from '@home/components/event/event-content-dialog.component'; import { isEqual, sortObjectKeys } from '@core/utils'; -import {historyInterval, MINUTE} from '@shared/models/time/time.models'; +import { historyInterval, MINUTE } from '@shared/models/time/time.models'; import { ConnectedPosition, Overlay, OverlayConfig, OverlayRef } from '@angular/cdk/overlay'; import { ChangeDetectorRef, Injector, StaticProvider, ViewContainerRef } from '@angular/core'; import { ComponentPortal } from '@angular/cdk/portal'; From 141a7ff0e6be9bb132e788c870b6689f84fb8b5b Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 9 Aug 2023 21:21:15 +0200 Subject: [PATCH 16/19] changed recalculate_delay --- application/src/main/resources/thingsboard.yml | 5 ++++- .../server/queue/discovery/ZkDiscoveryService.java | 2 +- msa/vc-executor/src/main/resources/tb-vc-executor.yml | 2 +- transport/coap/src/main/resources/tb-coap-transport.yml | 2 +- transport/http/src/main/resources/tb-http-transport.yml | 2 +- transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml | 2 +- transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 2 +- transport/snmp/src/main/resources/tb-snmp-transport.yml | 2 +- 8 files changed, 11 insertions(+), 8 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1f16fbc414..5b76175fcd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,7 +96,10 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + # The recalculate_delay property recommended in a microservices architecture setup for rule-engine services. + # This property provides a pause to ensure that when a rule-engine service is restarted, other nodes don't immediately attempt to recalculate their partitions. + # The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability. + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 44999d016a..e99817de17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -69,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; - @Value("${zk.recalculate_delay:60000}") + @Value("${zk.recalculate_delay:0}") private Long recalculateDelay; protected final ConcurrentHashMap> delayedTasks; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 66c6b4d3da..9e57a35e20 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f4b5e0bc94..a545759f38 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index f92da86b99..1f042fb131 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,7 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 05388473f0..ffe815d441 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e131788929..f7d0209804 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index a7928eb49f..3ed46dde78 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" cache: type: "${CACHE_TYPE:redis}" From 928962898e266cfabebd949bd2e1b7f106b84769 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 11 Aug 2023 19:23:51 +0300 Subject: [PATCH 17/19] PROD-2339: fix getFeatureType method to handle RPC server-side response over DTLS --- .../transport/coap/CoapTransportResource.java | 11 +- .../coap/CoapTransportResourceTest.java | 352 ++++++++++++++++++ 2 files changed, 360 insertions(+), 3 deletions(-) create mode 100644 common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 7dde25bfd0..263df9e7a2 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -30,6 +30,7 @@ import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.msg.session.FeatureType; @@ -380,12 +381,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } } - private Optional getFeatureType(Request request) { + protected Optional getFeatureType(Request request) { List uriPath = request.getOptions().getUriPath(); try { - if (uriPath.size() >= FEATURE_TYPE_POSITION) { + int size = uriPath.size(); + if (size >= FEATURE_TYPE_POSITION) { + if (size == FEATURE_TYPE_POSITION && StringUtils.isNumeric(uriPath.get(size - 1))) { + return Optional.of(FeatureType.valueOf(uriPath.get(FEATURE_TYPE_POSITION - 2).toUpperCase())); + } return Optional.of(FeatureType.valueOf(uriPath.get(FEATURE_TYPE_POSITION - 1).toUpperCase())); - } else if (uriPath.size() >= FEATURE_TYPE_POSITION_CERTIFICATE_REQUEST) { + } else if (size == FEATURE_TYPE_POSITION_CERTIFICATE_REQUEST) { if (uriPath.contains(DataConstants.PROVISION)) { return Optional.of(FeatureType.valueOf(DataConstants.PROVISION.toUpperCase())); } diff --git a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java new file mode 100644 index 0000000000..666f6c95df --- /dev/null +++ b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java @@ -0,0 +1,352 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap; + +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.OptionSet; +import org.eclipse.californium.core.coap.Request; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.coapserver.CoapServerService; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.msg.session.FeatureType; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.queue.scheduler.SchedulerComponent; +import org.thingsboard.server.transport.coap.client.CoapClientContext; + +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CoapTransportResourceTest { + + private static final String V1 = "v1"; + private static final String API = "api"; + private static final String TELEMETRY = "telemetry"; + private static final String ATTRIBUTES = "attributes"; + private static final String RPC = "rpc"; + private static final String CLAIM = "claim"; + private static final String PROVISION = "provision"; + private static final String GET_ATTRIBUTES_URI_QUERY = "clientKeys=attribute1,attribute2&sharedKeys=shared1,shared2"; + + private static final Random RANDOM = new Random(); + + private CoapTransportResource coapTransportResource; + + @BeforeEach + void setUp() { + + var ctxMock = mock(CoapTransportContext.class); + var coapServerServiceMock = mock(CoapServerService.class); + var transportServiceMock = mock(TransportService.class); + var clientContextMock = mock(CoapClientContext.class); + var schedulerComponentMock = mock(SchedulerComponent.class); + + when(ctxMock.getTransportService()).thenReturn(transportServiceMock); + when(ctxMock.getClientContext()).thenReturn(clientContextMock); + when(ctxMock.getSessionReportTimeout()).thenReturn(1L); + when(ctxMock.getScheduler()).thenReturn(schedulerComponentMock); + + coapTransportResource = new CoapTransportResource(ctxMock, coapServerServiceMock, V1); + } + + @AfterEach + void tearDown() { + } + + // accessToken based tests + + @Test + void givenPostTelemetryAccessTokenRequest_whenGetFeatureType_thenFeatureTypeTelemetry() { + // GIVEN + var request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), TELEMETRY); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.TELEMETRY, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenPostAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), ATTRIBUTES); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenGetAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + Request request = toGetAttributesAccessTokenRequest(StringUtils.randomAlphanumeric(20)); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenSubscribeForAttributesUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + Request request = toAccessTokenRequest(CoAP.Code.GET, StringUtils.randomAlphanumeric(20), ATTRIBUTES); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenSubscribeForRpcUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + Request request = toAccessTokenRequest(CoAP.Code.GET, StringUtils.randomAlphanumeric(20), RPC); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenRpcResponseAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + Request request = toRpcResponseAccessTokenRequest(StringUtils.randomAlphanumeric(20), RANDOM.nextInt(100)); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenClientSideRpcAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), RPC); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenClaimingAccessTokenRequest_whenGetFeatureType_thenFeatureTypeClaim() { + // GIVEN + Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), CLAIM); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.CLAIM, featureTypeOptional.get(), "Feature type is invalid"); + } + + // certificate based tests + + @Test + void givenPostTelemetryCertificateRequest_whenGetFeatureType_thenFeatureTypeTelemetry() { + // GIVEN + var request = toCertificateRequest(CoAP.Code.POST, TELEMETRY); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.TELEMETRY, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenPostAttributesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + var request = toCertificateRequest(CoAP.Code.POST, ATTRIBUTES); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenGetAttributesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + var request = toGetAttributesCertificateRequest(); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenSubscribeForAttributesUpdatesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { + // GIVEN + var request = toCertificateRequest(CoAP.Code.GET, ATTRIBUTES); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenSubscribeForRpcUpdatesCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + var request = toCertificateRequest(CoAP.Code.GET, RPC); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenRpcResponseCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + Request request = toRpcResponseCertificateRequest(RANDOM.nextInt(100)); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenClientSideRpcCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { + // GIVEN + Request request = toCertificateRequest(CoAP.Code.POST, RPC); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); + } + + @Test + void givenClaimingCertificateRequest_whenGetFeatureType_thenFeatureTypeClaim() { + // GIVEN + Request request = toCertificateRequest(CoAP.Code.POST, CLAIM); + + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.CLAIM, featureTypeOptional.get(), "Feature type is invalid"); + } + + // provision request + + @Test + void givenProvisionRequest_whenGetFeatureType_thenFeatureTypeProvision() { + // GIVEN + Request request = toCertificateRequest(CoAP.Code.POST, PROVISION); + // WHEN + var featureTypeOptional = coapTransportResource.getFeatureType(request); + + // THEN + assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); + assertEquals(FeatureType.PROVISION, featureTypeOptional.get(), "Feature type is invalid"); + } + + private Request toAccessTokenRequest(CoAP.Code method, String accessToken, String featureType) { + return getAccessTokenRequest(method, accessToken, featureType, null, null); + } + + private Request toGetAttributesAccessTokenRequest(String accessToken) { + return getAccessTokenRequest(CoAP.Code.GET, accessToken, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); + } + + private Request toRpcResponseAccessTokenRequest(String accessToken, Integer requestId) { + return getAccessTokenRequest(CoAP.Code.POST, accessToken, CoapTransportResourceTest.RPC, requestId, null); + } + + private Request toCertificateRequest(CoAP.Code method, String featureType) { + return getCertificateRequest(method, featureType, null, null); + } + + private Request toGetAttributesCertificateRequest() { + return getCertificateRequest(CoAP.Code.GET, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); + } + + private Request toRpcResponseCertificateRequest(Integer requestId) { + return getCertificateRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, requestId, null); + } + + private Request getAccessTokenRequest(CoAP.Code method, String accessToken, String featureType, Integer requestId, String uriQuery) { + var request = new Request(method); + var options = new OptionSet(); + options.addUriPath(API); + options.addUriPath(V1); + options.addUriPath(accessToken); + options.addUriPath(featureType); + if (requestId != null) { + options.addUriPath(String.valueOf(requestId)); + } + if (uriQuery != null) { + options.setUriQuery(uriQuery); + } + request.setOptions(options); + return request; + } + + private Request getCertificateRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { + var request = new Request(method); + var options = new OptionSet(); + options.addUriPath(API); + options.addUriPath(V1); + options.addUriPath(featureType); + if (requestId != null) { + options.addUriPath(String.valueOf(requestId)); + } + if (uriQuery != null) { + options.setUriQuery(uriQuery); + } + request.setOptions(options); + return request; + } + + +} From 14ad1df873200ef8d69b82f98bab9cd6f8416d39 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Sat, 12 Aug 2023 09:38:49 +0300 Subject: [PATCH 18/19] refactoring of test base --- .../coap/CoapTransportResourceTest.java | 71 ++++++++----------- 1 file changed, 31 insertions(+), 40 deletions(-) diff --git a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java index 666f6c95df..2e5b367e4c 100644 --- a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java +++ b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java @@ -18,7 +18,9 @@ package org.thingsboard.server.transport.coap; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.OptionSet; import org.eclipse.californium.core.coap.Request; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.thingsboard.server.coapserver.CoapServerService; @@ -48,10 +50,10 @@ class CoapTransportResourceTest { private static final Random RANDOM = new Random(); - private CoapTransportResource coapTransportResource; + private static CoapTransportResource coapTransportResource; - @BeforeEach - void setUp() { + @BeforeAll + static void setUp() { var ctxMock = mock(CoapTransportContext.class); var coapServerServiceMock = mock(CoapServerService.class); @@ -67,16 +69,12 @@ class CoapTransportResourceTest { coapTransportResource = new CoapTransportResource(ctxMock, coapServerServiceMock, V1); } - @AfterEach - void tearDown() { - } - // accessToken based tests @Test void givenPostTelemetryAccessTokenRequest_whenGetFeatureType_thenFeatureTypeTelemetry() { // GIVEN - var request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), TELEMETRY); + var request = toAccessTokenRequest(CoAP.Code.POST, TELEMETRY); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -89,7 +87,7 @@ class CoapTransportResourceTest { @Test void givenPostAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), ATTRIBUTES); + Request request = toAccessTokenRequest(CoAP.Code.POST, ATTRIBUTES); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -102,7 +100,7 @@ class CoapTransportResourceTest { @Test void givenGetAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { // GIVEN - Request request = toGetAttributesAccessTokenRequest(StringUtils.randomAlphanumeric(20)); + Request request = toGetAttributesAccessTokenRequest(); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -114,7 +112,7 @@ class CoapTransportResourceTest { @Test void givenSubscribeForAttributesUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.GET, StringUtils.randomAlphanumeric(20), ATTRIBUTES); + Request request = toAccessTokenRequest(CoAP.Code.GET, ATTRIBUTES); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -126,7 +124,7 @@ class CoapTransportResourceTest { @Test void givenSubscribeForRpcUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.GET, StringUtils.randomAlphanumeric(20), RPC); + Request request = toAccessTokenRequest(CoAP.Code.GET, RPC); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -138,7 +136,7 @@ class CoapTransportResourceTest { @Test void givenRpcResponseAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { // GIVEN - Request request = toRpcResponseAccessTokenRequest(StringUtils.randomAlphanumeric(20), RANDOM.nextInt(100)); + Request request = toRpcResponseAccessTokenRequest(); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -150,7 +148,7 @@ class CoapTransportResourceTest { @Test void givenClientSideRpcAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), RPC); + Request request = toAccessTokenRequest(CoAP.Code.POST, RPC); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -162,7 +160,7 @@ class CoapTransportResourceTest { @Test void givenClaimingAccessTokenRequest_whenGetFeatureType_thenFeatureTypeClaim() { // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, StringUtils.randomAlphanumeric(20), CLAIM); + Request request = toAccessTokenRequest(CoAP.Code.POST, CLAIM); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -241,7 +239,7 @@ class CoapTransportResourceTest { @Test void givenRpcResponseCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { // GIVEN - Request request = toRpcResponseCertificateRequest(RANDOM.nextInt(100)); + Request request = toRpcResponseCertificateRequest(); // WHEN var featureTypeOptional = coapTransportResource.getFeatureType(request); @@ -291,16 +289,16 @@ class CoapTransportResourceTest { assertEquals(FeatureType.PROVISION, featureTypeOptional.get(), "Feature type is invalid"); } - private Request toAccessTokenRequest(CoAP.Code method, String accessToken, String featureType) { - return getAccessTokenRequest(method, accessToken, featureType, null, null); + private Request toAccessTokenRequest(CoAP.Code method, String featureType) { + return getAccessTokenRequest(method, featureType, null, null); } - private Request toGetAttributesAccessTokenRequest(String accessToken) { - return getAccessTokenRequest(CoAP.Code.GET, accessToken, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); + private Request toGetAttributesAccessTokenRequest() { + return getAccessTokenRequest(CoAP.Code.GET, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); } - private Request toRpcResponseAccessTokenRequest(String accessToken, Integer requestId) { - return getAccessTokenRequest(CoAP.Code.POST, accessToken, CoapTransportResourceTest.RPC, requestId, null); + private Request toRpcResponseAccessTokenRequest() { + return getAccessTokenRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, RANDOM.nextInt(100), null); } private Request toCertificateRequest(CoAP.Code method, String featureType) { @@ -311,32 +309,26 @@ class CoapTransportResourceTest { return getCertificateRequest(CoAP.Code.GET, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); } - private Request toRpcResponseCertificateRequest(Integer requestId) { - return getCertificateRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, requestId, null); + private Request toRpcResponseCertificateRequest() { + return getCertificateRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, RANDOM.nextInt(100), null); } - private Request getAccessTokenRequest(CoAP.Code method, String accessToken, String featureType, Integer requestId, String uriQuery) { - var request = new Request(method); - var options = new OptionSet(); - options.addUriPath(API); - options.addUriPath(V1); - options.addUriPath(accessToken); - options.addUriPath(featureType); - if (requestId != null) { - options.addUriPath(String.valueOf(requestId)); - } - if (uriQuery != null) { - options.setUriQuery(uriQuery); - } - request.setOptions(options); - return request; + private Request getAccessTokenRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { + return getRequest(method, featureType, false, requestId, uriQuery); } private Request getCertificateRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { + return getRequest(method, featureType, true, requestId, uriQuery); + } + + private Request getRequest(CoAP.Code method, String featureType, boolean dtls, Integer requestId, String uriQuery) { var request = new Request(method); var options = new OptionSet(); options.addUriPath(API); options.addUriPath(V1); + if (!dtls) { + options.addUriPath(StringUtils.randomAlphanumeric(20)); + } options.addUriPath(featureType); if (requestId != null) { options.addUriPath(String.valueOf(requestId)); @@ -348,5 +340,4 @@ class CoapTransportResourceTest { return request; } - } From a90653d6606eee59345943e41d7ec2b27fceb266 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Sat, 12 Aug 2023 09:52:36 +0300 Subject: [PATCH 19/19] refactored to parameterized test --- .../coap/CoapTransportResourceTest.java | 270 +++--------------- 1 file changed, 44 insertions(+), 226 deletions(-) diff --git a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java index 2e5b367e4c..c7f33e3694 100644 --- a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java +++ b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapTransportResourceTest.java @@ -18,11 +18,10 @@ package org.thingsboard.server.transport.coap; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.OptionSet; import org.eclipse.californium.core.coap.Request; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.thingsboard.server.coapserver.CoapServerService; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.msg.session.FeatureType; @@ -31,6 +30,7 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.coap.client.CoapClientContext; import java.util.Random; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -69,259 +69,77 @@ class CoapTransportResourceTest { coapTransportResource = new CoapTransportResource(ctxMock, coapServerServiceMock, V1); } - // accessToken based tests - - @Test - void givenPostTelemetryAccessTokenRequest_whenGetFeatureType_thenFeatureTypeTelemetry() { - // GIVEN - var request = toAccessTokenRequest(CoAP.Code.POST, TELEMETRY); - - // WHEN + @ParameterizedTest + @MethodSource("provideRequestAndFeatureType") + void givenRequest_whenGetFeatureType_thenReturnedExpectedFeatureType(Request request, FeatureType expectedFeatureType) { var featureTypeOptional = coapTransportResource.getFeatureType(request); - // THEN assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.TELEMETRY, featureTypeOptional.get(), "Feature type is invalid"); + assertEquals(expectedFeatureType, featureTypeOptional.get(), "Feature type is invalid"); } - @Test - void givenPostAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, ATTRIBUTES); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); + static Stream provideRequestAndFeatureType() { + return Stream.of( + // accessToken based tests + Arguments.of(toAccessTokenRequest(CoAP.Code.POST, TELEMETRY), FeatureType.TELEMETRY), + Arguments.of(toAccessTokenRequest(CoAP.Code.POST, ATTRIBUTES), FeatureType.ATTRIBUTES), + Arguments.of(toGetAttributesAccessTokenRequest(), FeatureType.ATTRIBUTES), + Arguments.of(toAccessTokenRequest(CoAP.Code.GET, ATTRIBUTES), FeatureType.ATTRIBUTES), + Arguments.of(toAccessTokenRequest(CoAP.Code.GET, RPC), FeatureType.RPC), + Arguments.of(toRpcResponseAccessTokenRequest(), FeatureType.RPC), + Arguments.of(toAccessTokenRequest(CoAP.Code.POST, RPC), FeatureType.RPC), + Arguments.of(toAccessTokenRequest(CoAP.Code.POST, CLAIM), FeatureType.CLAIM), + // certificate based tests + Arguments.of(toCertificateRequest(CoAP.Code.POST, TELEMETRY), FeatureType.TELEMETRY), + Arguments.of(toCertificateRequest(CoAP.Code.POST, ATTRIBUTES), FeatureType.ATTRIBUTES), + Arguments.of(toGetAttributesCertificateRequest(), FeatureType.ATTRIBUTES), + Arguments.of(toCertificateRequest(CoAP.Code.GET, ATTRIBUTES), FeatureType.ATTRIBUTES), + Arguments.of(toCertificateRequest(CoAP.Code.GET, RPC), FeatureType.RPC), + Arguments.of(toRpcResponseCertificateRequest(), FeatureType.RPC), + Arguments.of(toCertificateRequest(CoAP.Code.POST, RPC), FeatureType.RPC), + Arguments.of(toCertificateRequest(CoAP.Code.POST, CLAIM), FeatureType.CLAIM), + // provision request + Arguments.of(toProvisionRequest(), FeatureType.PROVISION) + ); } - @Test - void givenGetAttributesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - Request request = toGetAttributesAccessTokenRequest(); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenSubscribeForAttributesUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.GET, ATTRIBUTES); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenSubscribeForRpcUpdatesAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.GET, RPC); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenRpcResponseAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - Request request = toRpcResponseAccessTokenRequest(); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenClientSideRpcAccessTokenRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, RPC); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenClaimingAccessTokenRequest_whenGetFeatureType_thenFeatureTypeClaim() { - // GIVEN - Request request = toAccessTokenRequest(CoAP.Code.POST, CLAIM); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.CLAIM, featureTypeOptional.get(), "Feature type is invalid"); - } - - // certificate based tests - - @Test - void givenPostTelemetryCertificateRequest_whenGetFeatureType_thenFeatureTypeTelemetry() { - // GIVEN - var request = toCertificateRequest(CoAP.Code.POST, TELEMETRY); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.TELEMETRY, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenPostAttributesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - var request = toCertificateRequest(CoAP.Code.POST, ATTRIBUTES); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenGetAttributesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - var request = toGetAttributesCertificateRequest(); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenSubscribeForAttributesUpdatesCertificateRequest_whenGetFeatureType_thenFeatureTypeAttributes() { - // GIVEN - var request = toCertificateRequest(CoAP.Code.GET, ATTRIBUTES); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.ATTRIBUTES, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenSubscribeForRpcUpdatesCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - var request = toCertificateRequest(CoAP.Code.GET, RPC); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenRpcResponseCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - Request request = toRpcResponseCertificateRequest(); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenClientSideRpcCertificateRequest_whenGetFeatureType_thenFeatureTypeRpc() { - // GIVEN - Request request = toCertificateRequest(CoAP.Code.POST, RPC); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.RPC, featureTypeOptional.get(), "Feature type is invalid"); - } - - @Test - void givenClaimingCertificateRequest_whenGetFeatureType_thenFeatureTypeClaim() { - // GIVEN - Request request = toCertificateRequest(CoAP.Code.POST, CLAIM); - - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.CLAIM, featureTypeOptional.get(), "Feature type is invalid"); - } - - // provision request - - @Test - void givenProvisionRequest_whenGetFeatureType_thenFeatureTypeProvision() { - // GIVEN - Request request = toCertificateRequest(CoAP.Code.POST, PROVISION); - // WHEN - var featureTypeOptional = coapTransportResource.getFeatureType(request); - - // THEN - assertTrue(featureTypeOptional.isPresent(), "Optional is empty"); - assertEquals(FeatureType.PROVISION, featureTypeOptional.get(), "Feature type is invalid"); - } - - private Request toAccessTokenRequest(CoAP.Code method, String featureType) { + private static Request toAccessTokenRequest(CoAP.Code method, String featureType) { return getAccessTokenRequest(method, featureType, null, null); } - private Request toGetAttributesAccessTokenRequest() { + private static Request toGetAttributesAccessTokenRequest() { return getAccessTokenRequest(CoAP.Code.GET, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); } - private Request toRpcResponseAccessTokenRequest() { + private static Request toRpcResponseAccessTokenRequest() { return getAccessTokenRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, RANDOM.nextInt(100), null); } - private Request toCertificateRequest(CoAP.Code method, String featureType) { + private static Request toCertificateRequest(CoAP.Code method, String featureType) { return getCertificateRequest(method, featureType, null, null); } - private Request toGetAttributesCertificateRequest() { + private static Request toGetAttributesCertificateRequest() { return getCertificateRequest(CoAP.Code.GET, CoapTransportResourceTest.ATTRIBUTES, null, CoapTransportResourceTest.GET_ATTRIBUTES_URI_QUERY); } - private Request toRpcResponseCertificateRequest() { + private static Request toRpcResponseCertificateRequest() { return getCertificateRequest(CoAP.Code.POST, CoapTransportResourceTest.RPC, RANDOM.nextInt(100), null); } - private Request getAccessTokenRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { + private static Request getAccessTokenRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { return getRequest(method, featureType, false, requestId, uriQuery); } - private Request getCertificateRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { + private static Request getCertificateRequest(CoAP.Code method, String featureType, Integer requestId, String uriQuery) { return getRequest(method, featureType, true, requestId, uriQuery); } - private Request getRequest(CoAP.Code method, String featureType, boolean dtls, Integer requestId, String uriQuery) { + private static Request toProvisionRequest() { + return getRequest(CoAP.Code.POST, PROVISION, true, null, null); + } + + private static Request getRequest(CoAP.Code method, String featureType, boolean dtls, Integer requestId, String uriQuery) { var request = new Request(method); var options = new OptionSet(); options.addUriPath(API);