MQTT Protocol

How to use the MQTT support in Gatling to connect to a broker and perform checks against inbound messages.

Prerequisites

Gatling Enterprise MQTT DSL is not imported by default.

You have to manually add the following imports:

     
import io.gatling.javaapi.mqtt.*;

import static io.gatling.javaapi.mqtt.MqttDsl.*;
import io.gatling.javaapi.mqtt.MqttDsl.*
import io.gatling.mqtt.Predef._

MQTT protocol

Use the mqtt object in order to create a MQTT protocol.

     
MqttProtocolBuilder mqttProtocol = mqtt
  // enable protocol version 3.1 (default: false)
  .mqttVersion_3_1()
  // enable protocol version 3.1.1 (default: true)
  .mqttVersion_3_1_1()
  // broker address (default: localhost:1883)
  .broker("hostname", 1883)
  // if TLS should be enabled (default: false)
  .useTls(true)
  // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
  .perUserKeyManagerFactory(userId -> (javax.net.ssl.KeyManagerFactory) null)
  // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  .clientId("#{id}")
  // if session should be cleaned during connect (default: true)
  .cleanSession(true)
  // optional credentials for connecting
  .credentials("#{userName}", "#{password}")
  // connections keep alive timeout
  .keepAlive(30)
  // use at-most-once QoS (default: true)
  .qosAtMostOnce()
  // use at-least-once QoS (default: false)
  .qosAtLeastOnce()
  // use exactly-once QoS (default: false)
  .qosExactlyOnce()
  // enable retain (default: false)
  .retain(false)
  // send last will, possibly with specific QoS and retain
  .lastWill(
    LastWill("#{willTopic}", StringBody("#{willMessage}"))
    .qosAtLeastOnce()
    .retain(true)
  )
  // max number of reconnects after connection crash (default: 3)
  .reconnectAttemptsMax(1)
  // reconnect delay after connection crash in millis (default: 100)
  .reconnectDelay(1)
  // reconnect delay exponential backoff (default: 1.5)
  .reconnectBackoffMultiplier(1.5F)
  //  resend delay after send failure in millis (default: 5000)
  .resendDelay(1000)
  // resend delay exponential backoff (default: 1.0)
  .resendBackoffMultiplier(2.0F)
  // interval for timeout checker (default: 1 second)
  .timeoutCheckInterval(1)
  // check for pairing messages sent and messages received
  .correlateBy((CheckBuilder) null)
  // enable unmatched MQTT inbound messages buffering,
  // with a max buffer size of 5
  .unmatchedInboundMessageBufferSize(5);
val mqttProtocol = mqtt
  // enable protocol version 3.1 (default: false)
  .mqttVersion_3_1()
  // enable protocol version 3.1.1 (default: true)
  .mqttVersion_3_1_1()
  // broker address (default: localhost:1883)
  .broker("hostname", 1883)
  // if TLS should be enabled (default: false)
  .useTls(true)
  // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
  .perUserKeyManagerFactory { userId -> null as KeyManagerFactory? }
  // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  .clientId("#{id}")
  // if session should be cleaned during connect (default: true)
  .cleanSession(true)
  // optional credentials for connecting
  .credentials("#{userName}", "#{password}")
  // connections keep alive timeout
  .keepAlive(30)
  // use at-most-once QoS (default: true)
  .qosAtMostOnce()
  // use at-least-once QoS (default: false)
  .qosAtLeastOnce()
  // use exactly-once QoS (default: false)
  .qosExactlyOnce()
  // enable retain (default: false)
  .retain(false)
  // send last will, possibly with specific QoS and retain
  .lastWill(
    LastWill("#{willTopic}", StringBody("#{willMessage}"))
      .qosAtLeastOnce()
      .retain(true)
  )
  // max number of reconnects after connection crash (default: 3)
  .reconnectAttemptsMax(1)
  // reconnect delay after connection crash in millis (default: 100)
  .reconnectDelay(1)
  // reconnect delay exponential backoff (default: 1.5)
  .reconnectBackoffMultiplier(1.5f)
  //  resend delay after send failure in millis (default: 5000)
  .resendDelay(1000)
  // resend delay exponential backoff (default: 1.0)
  .resendBackoffMultiplier(2.0f)
  // interval for timeout checker (default: 1 second)
  .timeoutCheckInterval(1)
  // check for pairing messages sent and messages received
  .correlateBy(null as CheckBuilder)
val mqttProtocol = mqtt
  // enable protocol version 3.1 (default: false)
  .mqttVersion_3_1
  // enable protocol version 3.1.1 (default: true)
  .mqttVersion_3_1_1
  // broker address (default: localhost:1883)
  .broker("hostname", 1883)
  // if TLS should be enabled (default: false)
  .useTls(true)
  // Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
  .perUserKeyManagerFactory(userId => null.asInstanceOf[javax.net.ssl.KeyManagerFactory])
  // clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  .clientId("#{id}")
  // if session should be cleaned during connect (default: true)
  .cleanSession(true)
  // optional credentials for connecting
  .credentials("#{userName}", "#{password}")
  // connections keep alive timeout
  .keepAlive(30)
  // use at-most-once QoS (default: true)
  .qosAtMostOnce
  // use at-least-once QoS (default: false)
  .qosAtLeastOnce
  // use exactly-once QoS (default: false)
  .qosExactlyOnce
  // enable retain (default: false)
  .retain(false)
  // send last will, possibly with specific QoS and retain
  .lastWill(
    LastWill("#{willTopic}", StringBody("#{willMessage}"))
      .qosAtLeastOnce
      .retain(true)
  )
  // max number of reconnects after connection crash (default: 3)
  .reconnectAttemptsMax(1)
  // reconnect delay after connection crash in millis (default: 100)
  .reconnectDelay(1)
  // reconnect delay exponential backoff (default: 1.5)
  .reconnectBackoffMultiplier(1.5F)
  //  resend delay after send failure in millis (default: 5000)
  .resendDelay(1000)
  // resend delay exponential backoff (default: 1.0)
  .resendBackoffMultiplier(2.0F)
  // interval for timeout checker (default: 1 second)
  .timeoutCheckInterval(1)
  // check for pairing messages sent and messages received
  .correlateBy(null)

Request

Use the mqtt("requestName") method in order to create a MQTT request.

connect

Your virtual users first have to establish a connection.

     
mqtt("Connecting").connect();
mqtt("Connecting").connect()
mqtt("Connecting").connect

subscribe

Use the subscribe method to subscribe to an MQTT topic:

     
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce();
mqtt("Subscribing")
  .subscribe("#{myTopic}") // optional, override default QoS
  .qosAtMostOnce()
mqtt("Subscribing")
  .subscribe("#{myTopic}")
  // optional, override default QoS
  .qosAtMostOnce

publish

Use the publish method to publish a message. You can use the same Body API as for HTTP request bodies:

     
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"));
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))
mqtt("Publishing")
  .publish("#{myTopic}")
  .message(StringBody("#{myTextPayload}"))

MQTT checks

You can define blocking checks with await and non-blocking checks with expect. Those can be set right after subscribing, or after publishing:

     
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100));

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100));

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}");

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists());
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(Duration.ofMillis(100))

// publish and wait (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100))

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100), "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists())
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
  .expect(100.milliseconds)

// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds)

// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds, "repub/#{myTopic}")

// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
  .publish("#{myTopic}").message(StringBody("#{myPayload}"))
  .await(100.milliseconds).check(jsonPath("$.error").notExists)

You can optionally define in which topic the expected message will be received:

You can optionally define check criteria to be applied on the matching received message:

You can use waitForMessages and block for all pending non-blocking checks:

     
waitForMessages().timeout(Duration.ofMillis(100));
waitForMessages().timeout(Duration.ofMillis(100))
waitForMessages.timeout(100.milliseconds)

Processing unmatched messages

You can use processUnmatchedMessages to process inbound messages that haven’t been matched with a check and have been buffered. By default, unmatched inbound messages are not buffered, you must enable this feature by setting the size of the buffer on the protocol with .unmatchedInboundMessageQueueSize(maxSize). The buffer is reset when:

  • sending an outbound message
  • calling processUnmatchedMessages so we don’t present the same message twice

You can then pass your processing logic as a function. The list of messages passed to this function is sorted in timestamp ascending (meaning older messages first). It contains instances of type io.gatling.mqtt.action.MqttInboundMessage.

     
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}", (messages, session) -> session.set("messages", messages));

// collect the last text message and store it in the Session
processUnmatchedMessages(
    "#{myTopic}",
    (messages, session) -> {
      Collections.reverse(messages);
      String lastTextMessage =
          messages.stream()
              .map(m -> m.payloadUtf8String())
              .findFirst()
              .orElse(null);
      return lastTextMessage != null ?
          session.set("lastTextMessage", lastTextMessage) :
          session;
    });
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}") { messages, session -> session.set("messages", messages) }

// collect the last text message and store it in the Session
processUnmatchedMessages("#{myTopic}") { messages, session ->
  messages
    .map { m -> m.payloadUtf8String() }
    .takeLast(1)
    .fold(session) { _, lastTextMessage ->
      session.set("lastTextMessage", lastTextMessage)
    }
}
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}") {
  (messages, session) => session.set("messages", messages)
}

// collect the last text message and store it in the Session
processUnmatchedMessages("#{myTopic}") {
  (messages, session) => {
    val lastMessage = messages.reverseIterator.nextOption().map(_.payloadUtf8String)
    lastMessage.fold(session)(m => session.set("lastTextMessage", m))
  }
}

MQTT configuration

MQTT support honors the ssl and netty configurations from gatling.conf.

Example

     
public class MqttSample extends Simulation {
  MqttProtocolBuilder mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"));

  ScenarioBuilder scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()));

  {
    setUp(scn.injectOpen(rampUsersPerSec(10).to(1000).during(60)))
      .protocols(mqttProtocol);
  }
}
class MqttSample : Simulation() {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect())
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()))

  init {
    setUp(scn.injectOpen(rampUsersPerSec(10.0).to(1000.0).during(60)))
      .protocols(mqttProtocol)
  }
}
class MqttSample extends Simulation {
  val mqttProtocol = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect)
    .exec(mqtt("Subscribing").subscribe("#{myTopic}"))
    .exec(mqtt("Publishing").publish("#{myTopic}")
      .message(StringBody("#{myTextPayload}"))
      .expect(100.milliseconds).check(jsonPath("$.error").notExists))

  setUp(scn.inject(rampUsersPerSec(10) to 1000 during (60)))
    .protocols(mqttProtocol)
}

Edit this page on GitHub