diff --git a/tools/pom.xml b/tools/pom.xml index d78c4cad85..d96eb5382f 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -78,6 +78,43 @@ mockito-all test - + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + org.thingsboard.client.tools.MqttStressTestTool + + + + + + + + + diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java index b0ddf73952..5805cadb82 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java +++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java @@ -40,10 +40,10 @@ public class MqttStressTestClient { this.client = new MqttAsyncClient(brokerUri, clientId, persistence); } - public void connect() throws MqttException { + public IMqttToken connect() throws MqttException { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(deviceToken); - client.connect(options, null, new IMqttActionListener() { + return client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken iMqttToken) { log.info("OnSuccess"); @@ -60,6 +60,22 @@ public class MqttStressTestClient { client.disconnect(); } + + + public void warmUp(byte[] data) throws MqttException { + MqttMessage msg = new MqttMessage(data); + client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + } + }).waitForCompletion(); + } + + public void publishTelemetry(byte[] data) throws MqttException { long sendTime = System.currentTimeMillis(); MqttMessage msg = new MqttMessage(data); @@ -67,14 +83,12 @@ public class MqttStressTestClient { @Override public void onSuccess(IMqttToken asyncActionToken) { long ackTime = System.currentTimeMillis(); -// log.info("Delivery time: {}", ackTime - sendTime); results.onResult(true, ackTime - sendTime); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { long failTime = System.currentTimeMillis(); -// log.info("Failure time: {}", failTime - sendTime); results.onResult(false, failTime - sendTime); } }); diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java index 900d81fa7d..ed6f42be02 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java +++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java @@ -1,4 +1,4 @@ -package org.thingsboard.client.tools; /** +/** * Copyright © 2016 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,14 +13,32 @@ package org.thingsboard.client.tools; /** * See the License for the specific language governing permissions and * limitations under the License. */ +package org.thingsboard.client.tools; /** + * Copyright © 2016 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttToken; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceCredentials; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.UUID; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -29,60 +47,83 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class MqttStressTestTool { - private static final long TEST_DURATION = TimeUnit.MINUTES.toMillis(1); - private static final long TEST_ITERATION = TimeUnit.MILLISECONDS.toMillis(100); - private static final long TEST_SUB_ITERATION = TimeUnit.MILLISECONDS.toMillis(2); - private static final int DEVICE_COUNT = 100; - private static final String BASE_URL = "http://localhost:8080"; - private static final String[] MQTT_URLS = {"tcp://localhost:1883"}; -// private static final String[] MQTT_URLS = {"tcp://localhost:1883", "tcp://localhost:1884", "tcp://localhost:1885"}; - private static final String USERNAME = "tenant@thingsboard.org"; - private static final String PASSWORD = "tenant"; - - public static void main(String[] args) throws Exception { + TestParams params = new TestParams(); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); + + + if (params.getDuration() % params.getIterationInterval() != 0) { + throw new IllegalArgumentException("Test Duration % Iteration Interval != 0"); + } + + if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) { + throw new IllegalArgumentException("Iteration Interval % Device Count != 0"); + } + ResultAccumulator results = new ResultAccumulator(); AtomicLong value = new AtomicLong(Long.MAX_VALUE); log.info("value: {} ", value.incrementAndGet()); - RestClient restClient = new RestClient(BASE_URL); - restClient.login(USERNAME, PASSWORD); + RestClient restClient = new RestClient(params.getRestApiUrl()); + restClient.login(params.getUsername(), params.getPassword()); List clients = new ArrayList<>(); - for (int i = 0; i < DEVICE_COUNT; i++) { - Device device = restClient.createDevice("Device " + i); + List connectTokens = new ArrayList<>(); + for (int i = 0; i < params.getDeviceCount(); i++) { + Device device = restClient.createDevice("Device " + UUID.randomUUID()); DeviceCredentials credentials = restClient.getCredentials(device.getId()); - String mqttURL = MQTT_URLS[i % MQTT_URLS.length]; + String[] mqttUrls = params.getMqttUrls(); + String mqttURL = mqttUrls[i % mqttUrls.length]; MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId()); - client.connect(); + connectTokens.add(client.connect()); clients.add(client); } - Thread.sleep(1000); + for (IMqttToken tokens : connectTokens) { + tokens.waitForCompletion(); + } byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8); - long startTime = System.currentTimeMillis(); - int iterationsCount = (int) (TEST_DURATION / TEST_ITERATION); - int subIterationsCount = (int) (TEST_ITERATION / TEST_SUB_ITERATION); - if (clients.size() % subIterationsCount != 0) { - throw new IllegalArgumentException("Invalid parameter exception!"); - } - for (int i = 0; i < iterationsCount; i++) { - for (int j = 0; j < subIterationsCount; j++) { - int packSize = clients.size() / subIterationsCount; - for (int k = 0; k < packSize; k++) { - int clientIndex = packSize * j + k; - clients.get(clientIndex).publishTelemetry(data); - } - Thread.sleep(TEST_SUB_ITERATION); - } + + for (MqttStressTestClient client : clients) { + client.warmUp(data); } + Thread.sleep(1000); + + long startTime = System.currentTimeMillis(); + int iterationsCount = (int) (params.getDuration() / params.getIterationInterval()); + int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount()); + + List> iterationFutures = new ArrayList<>(); + for (int i = 0; i < iterationsCount; i++) { + long delay = i * params.getIterationInterval(); + iterationFutures.add(scheduler.schedule((Callable) () -> { + long sleepMicroSeconds = 0L; + for (MqttStressTestClient client : clients) { + client.publishTelemetry(data); + sleepMicroSeconds += subIterationMicroSeconds; + if (sleepMicroSeconds > 1000) { + Thread.sleep(sleepMicroSeconds / 1000); + sleepMicroSeconds = sleepMicroSeconds % 1000; + } + } + return null; + }, delay, TimeUnit.MILLISECONDS)); + } + + for (ScheduledFuture future : iterationFutures) { + future.get(); + } + + Thread.sleep(1000); + for (MqttStressTestClient client : clients) { client.disconnect(); } log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime); + scheduler.shutdownNow(); } } diff --git a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java index 5bba82a8e6..1364fc156b 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java +++ b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java @@ -73,7 +73,7 @@ public class ResultAccumulator { @Override public String toString() { - return "org.thingsboard.client.tools.ResultAccumulator{" + + return "Result {" + "successCount=" + getSuccessCount() + ", errorCount=" + getErrorCount() + ", totalTime=" + getTimeSpent() + diff --git a/tools/src/main/java/org/thingsboard/client/tools/TestParams.java b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java new file mode 100644 index 0000000000..eb1328b61e --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client.tools; + +import lombok.extern.slf4j.Slf4j; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class TestParams { + static final String TEST_PROPERTIES = "test.properties"; + static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1); + static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100); + static final int DEFAULT_DEVICE_COUNT = 100; + static final String DEFAULT_REST_URL = "http://localhost:8080"; + static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883"; + static final String DEFAULT_USERNAME = "tenant@thingsboard.org"; + static final String DEFAULT_PASSWORD = "tenant"; + + private Properties params = new Properties(); + + public TestParams() throws IOException { + try { + params.load(new FileInputStream(TEST_PROPERTIES)); + } catch (Exception e) { + log.warn("Failed to read " + TEST_PROPERTIES); + } + } + + public long getDuration() { + return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION))); + } + + public long getIterationInterval() { + return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL))); + } + + public int getDeviceCount() { + return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT))); + } + + public String getRestApiUrl() { + return params.getProperty("restUrl", DEFAULT_REST_URL); + } + + public String[] getMqttUrls() { + return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(","); + } + + public String getUsername() { + return params.getProperty("username", DEFAULT_USERNAME); + } + + public String getPassword() { + return params.getProperty("password", DEFAULT_PASSWORD); + } +} diff --git a/tools/src/main/resources/logback.xml b/tools/src/main/resources/logback.xml new file mode 100644 index 0000000000..11973fa644 --- /dev/null +++ b/tools/src/main/resources/logback.xml @@ -0,0 +1,34 @@ + + + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/tools/test.properties b/tools/test.properties new file mode 100644 index 0000000000..93efc60698 --- /dev/null +++ b/tools/test.properties @@ -0,0 +1,3 @@ +deviceCount=1000 +durationMs=5000 +iterationIntervalMs=250