diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java index 05075ba9de..ce637fdea0 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java +++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java @@ -22,16 +22,15 @@ import org.eclipse.paho.client.mqttv3.IMqttToken; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceCredentials; -import java.io.BufferedWriter; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -40,11 +39,15 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class MqttStressTestTool { + private static byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8); + private static ResultAccumulator results = new ResultAccumulator(); + private static List clients = new ArrayList<>(); + private static List connectTokens = new ArrayList<>(); + public static void main(String[] args) throws Exception { TestParams params = new TestParams(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); - if (params.getDuration() % params.getIterationInterval() != 0) { throw new IllegalArgumentException("Test Duration % Iteration Interval != 0"); } @@ -53,41 +56,7 @@ public class MqttStressTestTool { throw new IllegalArgumentException("Iteration Interval % Device Count != 0"); } - ResultAccumulator results = new ResultAccumulator(); - - AtomicLong value = new AtomicLong(Long.MAX_VALUE); - log.info("value: {} ", value.incrementAndGet()); - - RestClient restClient = new RestClient(params.getRestApiUrl()); - restClient.login(params.getUsername(), params.getPassword()); - - List clients = new ArrayList<>(); - List connectTokens = new ArrayList<>(); - List deviceCredentialsIds = new ArrayList<>(); - for (int i = 0; i < params.getDeviceCount(); i++) { - Device device = restClient.createDevice("Device " + UUID.randomUUID()); - DeviceCredentials credentials = restClient.getCredentials(device.getId()); - String[] mqttUrls = params.getMqttUrls(); - String mqttURL = mqttUrls[i % mqttUrls.length]; - MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId()); - deviceCredentialsIds.add(credentials.getCredentialsId()); - connectTokens.add(client.connect()); - clients.add(client); - } - - dumpDeviceCredentialsIdsToTmpFile(deviceCredentialsIds); - - for (IMqttToken tokens : connectTokens) { - tokens.waitForCompletion(); - } - - byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8); - - for (MqttStressTestClient client : clients) { - client.warmUp(data); - } - - Thread.sleep(1000); + createDevices(params); long startTime = System.currentTimeMillis(); int iterationsCount = (int) (params.getDuration() / params.getIterationInterval()); @@ -123,20 +92,44 @@ public class MqttStressTestTool { scheduler.shutdownNow(); } - private static void dumpDeviceCredentialsIdsToTmpFile(List deviceCredentialsIds) throws IOException { - Path path = Paths.get("/tmp/mqtt.csv"); - try (BufferedWriter writer = Files.newBufferedWriter(path)) { - writer.write("deviceCredentialsId"); - writer.write('\n'); - deviceCredentialsIds.forEach((deviceCredentialsId) -> { - try { - writer.write(deviceCredentialsId); - writer.write('\n'); - } catch (IOException e) { - e.printStackTrace(); - } - }); - } - } + /** + * Returns list of device credential IDs + * + * @param params + * @return + * @throws Exception + */ + public static List createDevices(TestParams params) throws Exception { + AtomicLong value = new AtomicLong(Long.MAX_VALUE); + log.info("value: {} ", value.incrementAndGet()); + RestClient restClient = new RestClient(params.getRestApiUrl()); + restClient.login(params.getUsername(), params.getPassword()); + + List deviceCredentialsIds = new ArrayList<>(); + for (int i = 0; i < params.getDeviceCount(); i++) { + Device device = restClient.createDevice("Device " + UUID.randomUUID()); + DeviceCredentials credentials = restClient.getCredentials(device.getId()); + String[] mqttUrls = params.getMqttUrls(); + String mqttURL = mqttUrls[i % mqttUrls.length]; + MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId()); + + deviceCredentialsIds.add(credentials.getCredentialsId()); + + connectTokens.add(client.connect()); + clients.add(client); + } + + for (IMqttToken tokens : connectTokens) { + tokens.waitForCompletion(); + } + + for (MqttStressTestClient client : clients) { + client.warmUp(data); + } + + Thread.sleep(1000); + + return deviceCredentialsIds; + } } diff --git a/tools/src/main/java/org/thingsboard/client/tools/TestParams.java b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java index 1286f5efd9..88618f518e 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/TestParams.java +++ b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import java.io.FileInputStream; import java.io.IOException; +import java.net.URL; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -36,8 +37,9 @@ public class TestParams { private Properties params = new Properties(); public TestParams() throws IOException { + URL location = TestParams.class.getProtectionDomain().getCodeSource().getLocation(); try { - params.load(new FileInputStream(TEST_PROPERTIES)); + params.load(new FileInputStream(location.getFile() + TEST_PROPERTIES)); } catch (Exception e) { log.warn("Failed to read " + TEST_PROPERTIES); } diff --git a/tools/src/main/resources/test.properties b/tools/src/main/resources/test.properties index 6e9ed89cf9..d55c1fc205 100644 --- a/tools/src/main/resources/test.properties +++ b/tools/src/main/resources/test.properties @@ -1,5 +1,5 @@ restUrl=http://localhost:8080 mqttUrls=tcp://localhost:1883 -deviceCount=1 +deviceCount=100 durationMs=60000 -iterationIntervalMs=1000 +iterationIntervalMs=1000 \ No newline at end of file diff --git a/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala b/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala index 6002ede4bc..0d92b74f10 100644 --- a/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala +++ b/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala @@ -13,50 +13,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * Copyright © 2016 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.thingsboard.client.tools +import com.github.mnogu.gatling.mqtt.Predef._ import io.gatling.core.Predef._ import org.fusesource.mqtt.client.QoS +import scala.collection.JavaConverters._ + import scala.concurrent.duration._ -import com.github.mnogu.gatling.mqtt.Predef._ - class MqttSimulation extends Simulation { + val testParams = new TestParams() + + val deviceCredentialsIds: Array[String] = MqttStressTestTool.createDevices(testParams).asScala.toArray + val mqttConf = mqtt .host("tcp://localhost:1883") .userName("${deviceCredentialsId}") val connect = exec(mqtt("connect") - .connect()) + .connect()) - val publish = repeat(400) { + val publish = repeat(100) { exec(mqtt("publish") .publish("v1/devices/me/telemetry", "{\"key1\":\"value1\", \"key2\":\"value2\"}", QoS.AT_LEAST_ONCE, retain = false)) + .pause(100 milliseconds) } + val deviceCredentialsIdsFeeder = deviceCredentialsIds.map( x => {Map("deviceCredentialsId" -> x)}) + val scn = scenario("Scenario Name") - .feed(csv("/tmp/mqtt.csv").circular) + .feed(deviceCredentialsIdsFeeder) .exec(connect, publish) setUp( - scn - .inject(constantUsersPerSec(25) during (1 seconds)) + scn + .inject(constantUsersPerSec(testParams.getDeviceCount) during (1 seconds)) ).protocols(mqttConf) } \ No newline at end of file