Mqtt stress test tool

This commit is contained in:
Andrew Shvayka 2016-12-15 13:52:44 +02:00
parent 5471325f60
commit baaa26ff13
7 changed files with 243 additions and 41 deletions

View File

@ -78,6 +78,43 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>org.thingsboard.client.tools.MqttStressTestTool</Main-Class>
</manifestEntries>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -40,10 +40,10 @@ public class MqttStressTestClient {
this.client = new MqttAsyncClient(brokerUri, clientId, persistence);
}
public void connect() throws MqttException {
public IMqttToken connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(deviceToken);
client.connect(options, null, new IMqttActionListener() {
return client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.info("OnSuccess");
@ -60,6 +60,22 @@ public class MqttStressTestClient {
client.disconnect();
}
public void warmUp(byte[] data) throws MqttException {
MqttMessage msg = new MqttMessage(data);
client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
}).waitForCompletion();
}
public void publishTelemetry(byte[] data) throws MqttException {
long sendTime = System.currentTimeMillis();
MqttMessage msg = new MqttMessage(data);
@ -67,14 +83,12 @@ public class MqttStressTestClient {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
long ackTime = System.currentTimeMillis();
// log.info("Delivery time: {}", ackTime - sendTime);
results.onResult(true, ackTime - sendTime);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
long failTime = System.currentTimeMillis();
// log.info("Failure time: {}", failTime - sendTime);
results.onResult(false, failTime - sendTime);
}
});

View File

@ -1,4 +1,4 @@
package org.thingsboard.client.tools; /**
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -13,14 +13,32 @@ package org.thingsboard.client.tools; /**
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools; /**
* Copyright © 2016 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -29,60 +47,83 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class MqttStressTestTool {
private static final long TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
private static final long TEST_ITERATION = TimeUnit.MILLISECONDS.toMillis(100);
private static final long TEST_SUB_ITERATION = TimeUnit.MILLISECONDS.toMillis(2);
private static final int DEVICE_COUNT = 100;
private static final String BASE_URL = "http://localhost:8080";
private static final String[] MQTT_URLS = {"tcp://localhost:1883"};
// private static final String[] MQTT_URLS = {"tcp://localhost:1883", "tcp://localhost:1884", "tcp://localhost:1885"};
private static final String USERNAME = "tenant@thingsboard.org";
private static final String PASSWORD = "tenant";
public static void main(String[] args) throws Exception {
TestParams params = new TestParams();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
if (params.getDuration() % params.getIterationInterval() != 0) {
throw new IllegalArgumentException("Test Duration % Iteration Interval != 0");
}
if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) {
throw new IllegalArgumentException("Iteration Interval % Device Count != 0");
}
ResultAccumulator results = new ResultAccumulator();
AtomicLong value = new AtomicLong(Long.MAX_VALUE);
log.info("value: {} ", value.incrementAndGet());
RestClient restClient = new RestClient(BASE_URL);
restClient.login(USERNAME, PASSWORD);
RestClient restClient = new RestClient(params.getRestApiUrl());
restClient.login(params.getUsername(), params.getPassword());
List<MqttStressTestClient> clients = new ArrayList<>();
for (int i = 0; i < DEVICE_COUNT; i++) {
Device device = restClient.createDevice("Device " + i);
List<IMqttToken> connectTokens = new ArrayList<>();
for (int i = 0; i < params.getDeviceCount(); i++) {
Device device = restClient.createDevice("Device " + UUID.randomUUID());
DeviceCredentials credentials = restClient.getCredentials(device.getId());
String mqttURL = MQTT_URLS[i % MQTT_URLS.length];
String[] mqttUrls = params.getMqttUrls();
String mqttURL = mqttUrls[i % mqttUrls.length];
MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
client.connect();
connectTokens.add(client.connect());
clients.add(client);
}
Thread.sleep(1000);
for (IMqttToken tokens : connectTokens) {
tokens.waitForCompletion();
}
byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8);
long startTime = System.currentTimeMillis();
int iterationsCount = (int) (TEST_DURATION / TEST_ITERATION);
int subIterationsCount = (int) (TEST_ITERATION / TEST_SUB_ITERATION);
if (clients.size() % subIterationsCount != 0) {
throw new IllegalArgumentException("Invalid parameter exception!");
}
for (int i = 0; i < iterationsCount; i++) {
for (int j = 0; j < subIterationsCount; j++) {
int packSize = clients.size() / subIterationsCount;
for (int k = 0; k < packSize; k++) {
int clientIndex = packSize * j + k;
clients.get(clientIndex).publishTelemetry(data);
}
Thread.sleep(TEST_SUB_ITERATION);
}
for (MqttStressTestClient client : clients) {
client.warmUp(data);
}
Thread.sleep(1000);
long startTime = System.currentTimeMillis();
int iterationsCount = (int) (params.getDuration() / params.getIterationInterval());
int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount());
List<ScheduledFuture<Void>> iterationFutures = new ArrayList<>();
for (int i = 0; i < iterationsCount; i++) {
long delay = i * params.getIterationInterval();
iterationFutures.add(scheduler.schedule((Callable<Void>) () -> {
long sleepMicroSeconds = 0L;
for (MqttStressTestClient client : clients) {
client.publishTelemetry(data);
sleepMicroSeconds += subIterationMicroSeconds;
if (sleepMicroSeconds > 1000) {
Thread.sleep(sleepMicroSeconds / 1000);
sleepMicroSeconds = sleepMicroSeconds % 1000;
}
}
return null;
}, delay, TimeUnit.MILLISECONDS));
}
for (ScheduledFuture<Void> future : iterationFutures) {
future.get();
}
Thread.sleep(1000);
for (MqttStressTestClient client : clients) {
client.disconnect();
}
log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime);
scheduler.shutdownNow();
}
}

View File

@ -73,7 +73,7 @@ public class ResultAccumulator {
@Override
public String toString() {
return "org.thingsboard.client.tools.ResultAccumulator{" +
return "Result {" +
"successCount=" + getSuccessCount() +
", errorCount=" + getErrorCount() +
", totalTime=" + getTimeSpent() +

View File

@ -0,0 +1,73 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools;
import lombok.extern.slf4j.Slf4j;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestParams {
static final String TEST_PROPERTIES = "test.properties";
static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
static final int DEFAULT_DEVICE_COUNT = 100;
static final String DEFAULT_REST_URL = "http://localhost:8080";
static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
static final String DEFAULT_PASSWORD = "tenant";
private Properties params = new Properties();
public TestParams() throws IOException {
try {
params.load(new FileInputStream(TEST_PROPERTIES));
} catch (Exception e) {
log.warn("Failed to read " + TEST_PROPERTIES);
}
}
public long getDuration() {
return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION)));
}
public long getIterationInterval() {
return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL)));
}
public int getDeviceCount() {
return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT)));
}
public String getRestApiUrl() {
return params.getProperty("restUrl", DEFAULT_REST_URL);
}
public String[] getMqttUrls() {
return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(",");
}
public String getUsername() {
return params.getProperty("username", DEFAULT_USERNAME);
}
public String getPassword() {
return params.getProperty("password", DEFAULT_PASSWORD);
}
}

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE configuration>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.thingsboard" level="INFO" />
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

3
tools/test.properties Normal file
View File

@ -0,0 +1,3 @@
deviceCount=1000
durationMs=5000
iterationIntervalMs=250