MQTT Protocol

How to use the MQTT support in Gatling to connect to a broker and perform checks against inbound messages.

Prerequisites

Gatling Enterprise MQTT DSL is not imported by default.

You have to manually add the following imports:

     
import io.gatling.javaapi.mqtt.*;

import static io.gatling.javaapi.mqtt.MqttDsl.*;
import io.gatling.javaapi.mqtt.MqttDsl.*
import io.gatling.mqtt.Predef._

MQTT Protocol

Use the mqtt object in order to create a MQTT protocol.

     
/*
 * Copyright 2011-2024 GatlingCorp (https://gatling.io)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//#imprts
import io.gatling.javaapi.mqtt.*;

import static io.gatling.javaapi.mqtt.MqttDsl.*;
//#imprts
import java.time.Duration;
import io.gatling.javaapi.core.*;
import static io.gatling.javaapi.core.CoreDsl.*;

class MqttProtocolSampleJava {
//#protocol
MqttProtocolBuilder mqttProtocol = mqtt
  // enable protocol version 3.1 (default: false)
  .mqttVersion_3_1()
  // enable protocol version 3.1.1 (default: true)
  .mqttVersion_3_1_1()
  // broker address (default: localhost:1883)
  .broker("hostname", 1883)
  // if TLS should be enabled (default: false)
  .useTls(true)
  // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
  .perUserKeyManagerFactory(userId -> (javax.net.ssl.KeyManagerFactory) null)
  // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  .clientId("#{id}")
  // if session should be cleaned during connect (default: true)
  .cleanSession(true)
  // optional credentials for connecting
  .credentials("#{userName}", "#{password}")
  // connections keep alive timeout
  .keepAlive(30)
  // use at-most-once QoS (default: true)
  .qosAtMostOnce()
  // use at-least-once QoS (default: false)
  .qosAtLeastOnce()
  // use exactly-once QoS (default: false)
  .qosExactlyOnce()
  // enable retain (default: false)
  .retain(false)
  // send last will, possibly with specific QoS and retain
  .lastWill(
    LastWill("#{willTopic}", StringBody("#{willMessage}"))
    .qosAtLeastOnce()
    .retain(true)
  )
  // max number of reconnects after connection crash (default: 3)
  .reconnectAttemptsMax(1)
  // reconnect delay after connection crash in millis (default: 100)
  .reconnectDelay(1)
  // reconnect delay exponential backoff (default: 1.5)
  .reconnectBackoffMultiplier(1.5F)
  //  resend delay after send failure in millis (default: 5000)
  .resendDelay(1000)
  // resend delay exponential backoff (default: 1.0)
  .resendBackoffMultiplier(2.0F)
  // interval for timeout checker (default: 1 second)
  .timeoutCheckInterval(1)
  // check for pairing messages sent and messages received
  .correlateBy((CheckBuilder) null);
//#protocol

  {
//#connect
mqtt("Connecting").connect();
//#connect

//#subscribe
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce();
//#subscribe

//#publish
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"));
//#publish

//#check
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100));

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100));

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}");

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists());
//#check

//#waitForMessages
exec(waitForMessages().timeout(Duration.ofMillis(100)));
//#waitForMessages
  }

//#example
public class MqttSample extends Simulation {
  MqttProtocolBuilder mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"));

  ScenarioBuilder scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()));

  {
    setUp(scn.injectOpen(rampUsersPerSec(10).to(1000).during(60)))
      .protocols(mqttProtocol);
  }
}
//#example
}
import io.gatling.javaapi.core.*
import io.gatling.javaapi.core.CoreDsl.*

//#imprts

import io.gatling.javaapi.mqtt.MqttDsl.*
//#imprts
import java.time.Duration
import javax.net.ssl.KeyManagerFactory

class MqttProtocolSampleKotlin {
  //#protocol
  val mqttProtocol = mqtt
    // enable protocol version 3.1 (default: false)
    .mqttVersion_3_1()
    // enable protocol version 3.1.1 (default: true)
    .mqttVersion_3_1_1()
    // broker address (default: localhost:1883)
    .broker("hostname", 1883)
    // if TLS should be enabled (default: false)
    .useTls(true)
    // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
    .perUserKeyManagerFactory { userId -> null as KeyManagerFactory? }
    // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
    .clientId("#{id}")
    // if session should be cleaned during connect (default: true)
    .cleanSession(true)
    // optional credentials for connecting
    .credentials("#{userName}", "#{password}")
    // connections keep alive timeout
    .keepAlive(30)
    // use at-most-once QoS (default: true)
    .qosAtMostOnce()
    // use at-least-once QoS (default: false)
    .qosAtLeastOnce()
    // use exactly-once QoS (default: false)
    .qosExactlyOnce()
    // enable retain (default: false)
    .retain(false)
    // send last will, possibly with specific QoS and retain
    .lastWill(
      LastWill("#{willTopic}", StringBody("#{willMessage}"))
        .qosAtLeastOnce()
        .retain(true)
    )
    // max number of reconnects after connection crash (default: 3)
    .reconnectAttemptsMax(1)
    // reconnect delay after connection crash in millis (default: 100)
    .reconnectDelay(1)
    // reconnect delay exponential backoff (default: 1.5)
    .reconnectBackoffMultiplier(1.5f)
    //  resend delay after send failure in millis (default: 5000)
    .resendDelay(1000)
    // resend delay exponential backoff (default: 1.0)
    .resendBackoffMultiplier(2.0f)
    // interval for timeout checker (default: 1 second)
    .timeoutCheckInterval(1)
    // check for pairing messages sent and messages received
    .correlateBy(null as CheckBuilder)
//#protocol

  init {
//#connect
mqtt("Connecting").connect()
//#connect

//#subscribe
mqtt("Subscribing")
  .subscribe("#{myTopic}") // optional, override default QoS
  .qosAtMostOnce()
//#subscribe

//#publish
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))
//#publish

//#check
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100))

// publish and wait (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100))

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists())
//#check

//#waitForMessages
exec(waitForMessages().timeout(Duration.ofMillis(100)))
//#waitForMessages
  }

//#example
class MqttSample : Simulation() {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()))

  init {
    setUp(scn.injectOpen(rampUsersPerSec(10.0).to(1000.0).during(60)))
      .protocols(mqttProtocol)
  }
}
//#example
}
/*
 * Copyright 2011-2024 GatlingCorp (https://gatling.io)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import io.gatling.core.Predef._
import scala.concurrent.duration._
//#imprts
import io.gatling.mqtt.Predef._
//#imprts

class MqttProtocolSampleScala {
//#protocol
val mqttProtocol = mqtt
  // enable protocol version 3.1 (default: false)
  .mqttVersion_3_1
  // enable protocol version 3.1.1 (default: true)
  .mqttVersion_3_1_1
  // broker address (default: localhost:1883)
  .broker("hostname", 1883)
  // if TLS should be enabled (default: false)
  .useTls(true)
  // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
  .perUserKeyManagerFactory(userId => null.asInstanceOf[javax.net.ssl.KeyManagerFactory])
  // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  .clientId("#{id}")
  // if session should be cleaned during connect (default: true)
  .cleanSession(true)
  // optional credentials for connecting
  .credentials("#{userName}", "#{password}")
  // connections keep alive timeout
  .keepAlive(30)
  // use at-most-once QoS (default: true)
  .qosAtMostOnce
  // use at-least-once QoS (default: false)
  .qosAtLeastOnce
  // use exactly-once QoS (default: false)
  .qosExactlyOnce
  // enable retain (default: false)
  .retain(false)
  // send last will, possibly with specific QoS and retain
  .lastWill(
    LastWill("#{willTopic}", StringBody("#{willMessage}"))
      .qosAtLeastOnce
      .retain(true)
  )
  // max number of reconnects after connection crash (default: 3)
  .reconnectAttemptsMax(1)
  // reconnect delay after connection crash in millis (default: 100)
  .reconnectDelay(1)
  // reconnect delay exponential backoff (default: 1.5)
  .reconnectBackoffMultiplier(1.5F)
  //  resend delay after send failure in millis (default: 5000)
  .resendDelay(1000)
  // resend delay exponential backoff (default: 1.0)
  .resendBackoffMultiplier(2.0F)
  // interval for timeout checker (default: 1 second)
  .timeoutCheckInterval(1)
  // check for pairing messages sent and messages received
  .correlateBy(null)
//#protocol

//#connect
mqtt("Connecting").connect
//#connect

//#subscribe
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce
//#subscribe

//#publish
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))
//#publish

//#check
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(100.milliseconds)

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds)

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds, "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds).check(jsonPath("$.error").notExists)
//#check

//#waitForMessages
exec(waitForMessages.timeout(100.milliseconds))
//#waitForMessages

//#example
class MqttSample extends Simulation {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect)
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(100.milliseconds).check(jsonPath("$.error").notExists))

  setUp(scn.inject(rampUsersPerSec(10) to 1000 during (60)))
    .protocols(mqttProtocol)
}
//#example
}

Request

Use the mqtt("requestName") method in order to create a MQTT request.

connect

Your virtual users first have to establish a connection.

     
mqtt("Connecting").connect();
mqtt("Connecting").connect()
mqtt("Connecting").connect

subscribe

Use the subscribe method to subscribe to an MQTT topic:

     
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce();
mqtt("Subscribing")
  .subscribe("#{myTopic}") // optional, override default QoS
  .qosAtMostOnce()
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce

publish

Use the publish method to publish a message. You can use the same Body API as for HTTP request bodies:

     
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"));
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))

MQTT Checks

You can define blocking checks with await and non-blocking checks with expect. Those can be set right after subscribing, or after publishing:

     
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100));

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100));

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}");

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists());
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100))

// publish and wait (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100))

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists())
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(100.milliseconds)

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds)

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds, "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds).check(jsonPath("$.error").notExists)

You can optionally define in which topic the expected message will be received:

You can optionally define check criteria to be applied on the matching received message:

You can use waitForMessages and block for all pending non-blocking checks:

     
exec(waitForMessages().timeout(Duration.ofMillis(100)));
exec(waitForMessages().timeout(Duration.ofMillis(100)))
exec(waitForMessages.timeout(100.milliseconds))

MQTT configuration

MQTT support honors the ssl and netty configurations from gatling.conf.

Example

     
public class MqttSample extends Simulation {
  MqttProtocolBuilder mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"));

  ScenarioBuilder scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()));

  {
    setUp(scn.injectOpen(rampUsersPerSec(10).to(1000).during(60)))
      .protocols(mqttProtocol);
  }
}
class MqttSample : Simulation() {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()))

  init {
    setUp(scn.injectOpen(rampUsersPerSec(10.0).to(1000.0).during(60)))
      .protocols(mqttProtocol)
  }
}
class MqttSample extends Simulation {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect)
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(100.milliseconds).check(jsonPath("$.error").notExists))

  setUp(scn.inject(rampUsersPerSec(10) to 1000 during (60)))
    .protocols(mqttProtocol)
}

Edit this page on GitHub