Merge pull request #34 from volodymyr-babak/master

Moved stress test to different repo
This commit is contained in:
Andrew Shvayka 2017-01-11 21:45:05 +02:00 committed by GitHub
commit e25bd4f0dc
12 changed files with 12 additions and 511 deletions

View File

@ -29,9 +29,9 @@ app-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
parallelism-min = 1
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 12
parallelism-max = 1
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
@ -54,7 +54,7 @@ rpc-dispatcher {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 12
parallelism-max = 8
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
@ -82,7 +82,7 @@ core-dispatcher {
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 1.0
parallelism-factor = 0.25
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s
@ -105,7 +105,7 @@ rule-dispatcher {
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 1.0
parallelism-factor = 0.25
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s
@ -128,7 +128,7 @@ plugin-dispatcher {
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 1.0
parallelism-factor = 0.25
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s
@ -152,7 +152,7 @@ session-dispatcher {
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 1.0
parallelism-factor = 0.25
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s

View File

@ -167,7 +167,7 @@ actors:
# Cache parameters
cache:
# Enable/disable cache functionality.
enabled: "${CACHE_ENABLED:true}"
enabled: "${CACHE_ENABLED:false}"
device_credentials:
# Default time to store device credentials in cache, in seconds
time_to_live: "${CACHE_DEVICE_CREDENTIAL_TTL:3600}"

View File

@ -32,7 +32,7 @@ do
done
# Copying env variables into conf files
printenv | while read x; do echo export $x; done >> /usr/share/thingsboard/conf/thingsboard.conf
printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
cat /usr/share/thingsboard/conf/thingsboard.conf

View File

@ -48,39 +48,8 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>

View File

@ -1,111 +0,0 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools; /**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class MqttStressTestClient {
@Getter
private final String deviceToken;
@Getter
private final String clientId;
private final MqttClientPersistence persistence;
private final MqttAsyncClient client;
private final ResultAccumulator results;
public MqttStressTestClient(ResultAccumulator results, String brokerUri, String deviceToken) throws MqttException {
this.results = results;
this.clientId = MqttAsyncClient.generateClientId();
this.deviceToken = deviceToken;
this.persistence = new MemoryPersistence();
this.client = new MqttAsyncClient(brokerUri, clientId, persistence);
}
public IMqttToken connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(deviceToken);
return client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.info("OnSuccess");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable e) {
log.info("OnFailure", e);
}
});
}
public void disconnect() throws MqttException {
client.disconnect();
}
public void warmUp(byte[] data) throws MqttException {
MqttMessage msg = new MqttMessage(data);
client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
}).waitForCompletion();
}
public void publishTelemetry(byte[] data) throws MqttException {
long sendTime = System.currentTimeMillis();
MqttMessage msg = new MqttMessage(data);
client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
long ackTime = System.currentTimeMillis();
results.onResult(true, ackTime - sendTime);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
long failTime = System.currentTimeMillis();
results.onResult(false, failTime - sendTime);
}
});
}
}

View File

@ -1,129 +0,0 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools; /**
* Copyright © 2016 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class MqttStressTestTool {
public static void main(String[] args) throws Exception {
TestParams params = new TestParams();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
if (params.getDuration() % params.getIterationInterval() != 0) {
throw new IllegalArgumentException("Test Duration % Iteration Interval != 0");
}
if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) {
throw new IllegalArgumentException("Iteration Interval % Device Count != 0");
}
ResultAccumulator results = new ResultAccumulator();
AtomicLong value = new AtomicLong(Long.MAX_VALUE);
log.info("value: {} ", value.incrementAndGet());
RestClient restClient = new RestClient(params.getRestApiUrl());
restClient.login(params.getUsername(), params.getPassword());
List<MqttStressTestClient> clients = new ArrayList<>();
List<IMqttToken> connectTokens = new ArrayList<>();
for (int i = 0; i < params.getDeviceCount(); i++) {
Device device = restClient.createDevice("Device " + UUID.randomUUID());
DeviceCredentials credentials = restClient.getCredentials(device.getId());
String[] mqttUrls = params.getMqttUrls();
String mqttURL = mqttUrls[i % mqttUrls.length];
MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
connectTokens.add(client.connect());
clients.add(client);
}
for (IMqttToken tokens : connectTokens) {
tokens.waitForCompletion();
}
byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8);
for (MqttStressTestClient client : clients) {
client.warmUp(data);
}
Thread.sleep(1000);
long startTime = System.currentTimeMillis();
int iterationsCount = (int) (params.getDuration() / params.getIterationInterval());
int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount());
List<ScheduledFuture<Void>> iterationFutures = new ArrayList<>();
for (int i = 0; i < iterationsCount; i++) {
long delay = i * params.getIterationInterval();
iterationFutures.add(scheduler.schedule((Callable<Void>) () -> {
long sleepMicroSeconds = 0L;
for (MqttStressTestClient client : clients) {
client.publishTelemetry(data);
sleepMicroSeconds += subIterationMicroSeconds;
if (sleepMicroSeconds > 1000) {
Thread.sleep(sleepMicroSeconds / 1000);
sleepMicroSeconds = sleepMicroSeconds % 1000;
}
}
return null;
}, delay, TimeUnit.MILLISECONDS));
}
for (ScheduledFuture<Void> future : iterationFutures) {
future.get();
}
Thread.sleep(1000);
for (MqttStressTestClient client : clients) {
client.disconnect();
}
log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime);
scheduler.shutdownNow();
}
}

View File

@ -13,25 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools; /**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestExecution;

View File

@ -1,100 +0,0 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools; /**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class ResultAccumulator {
private AtomicLong minTime = new AtomicLong(Long.MAX_VALUE);
private AtomicLong maxTime = new AtomicLong(Long.MIN_VALUE);
private AtomicLong timeSpentCount = new AtomicLong();
private AtomicInteger successCount = new AtomicInteger();
private AtomicInteger errorCount = new AtomicInteger();
public void onResult(boolean success, long timeSpent) {
if (success) {
successCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
timeSpentCount.addAndGet(timeSpent);
while (!setMax(timeSpent)) ;
while (!setMin(timeSpent)) ;
}
private boolean setMax(long timeSpent) {
long curMax = maxTime.get();
long newMax = Math.max(curMax, timeSpent);
return maxTime.compareAndSet(curMax, newMax);
}
private boolean setMin(long timeSpent) {
long curMin = minTime.get();
long newMin = Math.min(curMin, timeSpent);
return minTime.compareAndSet(curMin, newMin);
}
public int getSuccessCount() {
return successCount.get();
}
public int getErrorCount() {
return errorCount.get();
}
public long getTimeSpent() {
return timeSpentCount.get();
}
public double getAvgTimeSpent() {
return ((double) getTimeSpent()) / (getSuccessCount() + getErrorCount());
}
@Override
public String toString() {
return "Result {" +
"successCount=" + getSuccessCount() +
", errorCount=" + getErrorCount() +
", totalTime=" + getTimeSpent() +
", avgTime=" + getAvgTimeSpent() +
", minTime=" + minTime.get() +
", maxTime=" + maxTime.get() +
'}';
}
}

View File

@ -1,73 +0,0 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools;
import lombok.extern.slf4j.Slf4j;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestParams {
static final String TEST_PROPERTIES = "test.properties";
static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
static final int DEFAULT_DEVICE_COUNT = 100;
static final String DEFAULT_REST_URL = "http://localhost:8080";
static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
static final String DEFAULT_PASSWORD = "tenant";
private Properties params = new Properties();
public TestParams() throws IOException {
try {
params.load(new FileInputStream(TEST_PROPERTIES));
} catch (Exception e) {
log.warn("Failed to read " + TEST_PROPERTIES);
}
}
public long getDuration() {
return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION)));
}
public long getIterationInterval() {
return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL)));
}
public int getDeviceCount() {
return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT)));
}
public String getRestApiUrl() {
return params.getProperty("restUrl", DEFAULT_REST_URL);
}
public String[] getMqttUrls() {
return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(",");
}
public String getUsername() {
return params.getProperty("username", DEFAULT_USERNAME);
}
public String getPassword() {
return params.getProperty("password", DEFAULT_PASSWORD);
}
}

View File

@ -1,34 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE configuration>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.thingsboard" level="INFO" />
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -1,5 +0,0 @@
restUrl=http://localhost:8080
mqttUrls=tcp://localhost:1883
deviceCount=1
durationMs=60000
iterationIntervalMs=1000

View File

@ -76,7 +76,6 @@ public class MqttTransportService {
@PostConstruct
public void init() throws Exception {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
log.info("Starting MQTT transport...");
log.info("Lookup MQTT transport adaptor {}", adaptorName);
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);