Performance improvements
This commit is contained in:
		
							parent
							
								
									d4b88f802a
								
							
						
					
					
						commit
						595e1cc2c1
					
				@ -29,9 +29,9 @@ app-dispatcher {
 | 
			
		||||
  executor = "fork-join-executor"
 | 
			
		||||
  fork-join-executor {
 | 
			
		||||
      # Min number of threads to cap factor-based parallelism number to
 | 
			
		||||
      parallelism-min = 2
 | 
			
		||||
      parallelism-min = 1
 | 
			
		||||
      # Max number of threads to cap factor-based parallelism number to
 | 
			
		||||
      parallelism-max = 12
 | 
			
		||||
      parallelism-max = 1
 | 
			
		||||
      
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
@ -54,7 +54,7 @@ rpc-dispatcher {
 | 
			
		||||
      # Min number of threads to cap factor-based parallelism number to
 | 
			
		||||
      parallelism-min = 2
 | 
			
		||||
      # Max number of threads to cap factor-based parallelism number to
 | 
			
		||||
      parallelism-max = 12
 | 
			
		||||
      parallelism-max = 8
 | 
			
		||||
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
@ -82,7 +82,7 @@ core-dispatcher {
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
      # is then bounded by the parallelism-min and parallelism-max values.
 | 
			
		||||
      parallelism-factor = 1.0
 | 
			
		||||
      parallelism-factor = 0.25
 | 
			
		||||
  }
 | 
			
		||||
  # How long time the dispatcher will wait for new actors until it shuts down
 | 
			
		||||
  shutdown-timeout = 1s
 | 
			
		||||
@ -105,7 +105,7 @@ rule-dispatcher {
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
      # is then bounded by the parallelism-min and parallelism-max values.
 | 
			
		||||
      parallelism-factor = 1.0
 | 
			
		||||
      parallelism-factor = 0.25
 | 
			
		||||
  }
 | 
			
		||||
  # How long time the dispatcher will wait for new actors until it shuts down
 | 
			
		||||
  shutdown-timeout = 1s
 | 
			
		||||
@ -128,7 +128,7 @@ plugin-dispatcher {
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
      # is then bounded by the parallelism-min and parallelism-max values.
 | 
			
		||||
      parallelism-factor = 1.0
 | 
			
		||||
      parallelism-factor = 0.25
 | 
			
		||||
  }
 | 
			
		||||
  # How long time the dispatcher will wait for new actors until it shuts down
 | 
			
		||||
  shutdown-timeout = 1s
 | 
			
		||||
@ -152,7 +152,7 @@ session-dispatcher {
 | 
			
		||||
      # The parallelism factor is used to determine thread pool size using the
 | 
			
		||||
      # following formula: ceil(available processors * factor). Resulting size
 | 
			
		||||
      # is then bounded by the parallelism-min and parallelism-max values.
 | 
			
		||||
      parallelism-factor = 1.0
 | 
			
		||||
      parallelism-factor = 0.25
 | 
			
		||||
  }
 | 
			
		||||
  # How long time the dispatcher will wait for new actors until it shuts down
 | 
			
		||||
  shutdown-timeout = 1s
 | 
			
		||||
 | 
			
		||||
@ -167,7 +167,7 @@ actors:
 | 
			
		||||
# Cache parameters
 | 
			
		||||
cache:
 | 
			
		||||
  # Enable/disable cache functionality.
 | 
			
		||||
  enabled: "${CACHE_ENABLED:true}"
 | 
			
		||||
  enabled: "${CACHE_ENABLED:false}"
 | 
			
		||||
  device_credentials:
 | 
			
		||||
    # Default time to store device credentials in cache, in seconds
 | 
			
		||||
    time_to_live: "${CACHE_DEVICE_CREDENTIAL_TTL:3600}"
 | 
			
		||||
 | 
			
		||||
@ -25,9 +25,9 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TestParams {
 | 
			
		||||
    static final String TEST_PROPERTIES = "test.properties";
 | 
			
		||||
    static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
 | 
			
		||||
    static final long DEFAULT_TEST_DURATION = TimeUnit.SECONDS.toMillis(1);
 | 
			
		||||
    static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
 | 
			
		||||
    static final int DEFAULT_DEVICE_COUNT = 25;
 | 
			
		||||
    static final int DEFAULT_DEVICE_COUNT = 2000;
 | 
			
		||||
    static final String DEFAULT_REST_URL = "http://localhost:8080";
 | 
			
		||||
    static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
 | 
			
		||||
    static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,7 @@
 | 
			
		||||
        </encoder>
 | 
			
		||||
    </appender>
 | 
			
		||||
 | 
			
		||||
    <logger name="io.gatling.core.action.Pause" level="WARN"/>
 | 
			
		||||
    <logger name="org.thingsboard" level="INFO" />
 | 
			
		||||
 | 
			
		||||
    <root level="INFO">
 | 
			
		||||
 | 
			
		||||
@ -45,18 +45,18 @@ class MqttSimulation extends Simulation {
 | 
			
		||||
  val connect = exec(mqtt("connect")
 | 
			
		||||
      .connect())
 | 
			
		||||
 | 
			
		||||
  val publish = repeat(400) {
 | 
			
		||||
  val publish = repeat(600) {
 | 
			
		||||
    exec(mqtt("publish")
 | 
			
		||||
      .publish("v1/devices/me/telemetry", "{\"key1\":\"value1\", \"key2\":\"value2\"}", QoS.AT_LEAST_ONCE, retain = false))
 | 
			
		||||
      .publish("v1/devices/me/telemetry", "{\"temperature\":\"42\"}", QoS.AT_LEAST_ONCE, retain = false)).pause(100 milliseconds)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  val scn = scenario("Scenario Name")
 | 
			
		||||
    .feed(csv("/tmp/mqtt.csv").circular)
 | 
			
		||||
    .feed(csv("/tmp/mqtt.csv"))
 | 
			
		||||
    .exec(connect, publish)
 | 
			
		||||
 | 
			
		||||
  setUp(
 | 
			
		||||
      scn
 | 
			
		||||
        .inject(constantUsersPerSec(25) during (1 seconds))
 | 
			
		||||
        .inject(constantUsersPerSec(1500) during (1 seconds))
 | 
			
		||||
  ).protocols(mqttConf)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -76,7 +76,6 @@ public class MqttTransportService {
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() throws Exception {
 | 
			
		||||
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
 | 
			
		||||
        log.info("Starting MQTT transport...");
 | 
			
		||||
        log.info("Lookup MQTT transport adaptor {}", adaptorName);
 | 
			
		||||
        this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user