MQTT Protocol
How to use the MQTT support in Gatling to connect to a broker and perform checks against inbound messages.
Prerequisites
Gatling Enterprise MQTT DSL is not imported by default.
You have to manually add the following imports:
import io.gatling.javaapi.mqtt.*;
import static io.gatling.javaapi.mqtt.MqttDsl.*;
import io.gatling.javaapi.mqtt.MqttDsl.*
import io.gatling.mqtt.Predef._
MQTT protocol
Use the mqtt
object in order to create a MQTT protocol.
MqttProtocolBuilder mqttProtocol = mqtt
// enable protocol version 3.1 (default: false)
.mqttVersion_3_1()
// enable protocol version 3.1.1 (default: true)
.mqttVersion_3_1_1()
// broker address (default: localhost:1883)
.broker("hostname", 1883)
// if TLS should be enabled (default: false)
.useTls(true)
// Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
.perUserKeyManagerFactory(userId -> (javax.net.ssl.KeyManagerFactory) null)
// clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
.clientId("#{id}")
// if session should be cleaned during connect (default: true)
.cleanSession(true)
// optional credentials for connecting
.credentials("#{userName}", "#{password}")
// connections keep alive timeout
.keepAlive(30)
// use at-most-once QoS (default: true)
.qosAtMostOnce()
// use at-least-once QoS (default: false)
.qosAtLeastOnce()
// use exactly-once QoS (default: false)
.qosExactlyOnce()
// enable retain (default: false)
.retain(false)
// send last will, possibly with specific QoS and retain
.lastWill(
LastWill("#{willTopic}", StringBody("#{willMessage}"))
.qosAtLeastOnce()
.retain(true)
)
// max number of reconnects after connection crash (default: 3)
.reconnectAttemptsMax(1)
// reconnect delay after connection crash in millis (default: 100)
.reconnectDelay(1)
// reconnect delay exponential backoff (default: 1.5)
.reconnectBackoffMultiplier(1.5F)
// resend delay after send failure in millis (default: 5000)
.resendDelay(1000)
// resend delay exponential backoff (default: 1.0)
.resendBackoffMultiplier(2.0F)
// interval for timeout checker (default: 1 second)
.timeoutCheckInterval(1)
// check for pairing messages sent and messages received
.correlateBy((CheckBuilder) null)
// enable unmatched MQTT inbound messages buffering,
// with a max buffer size of 5
.unmatchedInboundMessageBufferSize(5);
val mqttProtocol = mqtt
// enable protocol version 3.1 (default: false)
.mqttVersion_3_1()
// enable protocol version 3.1.1 (default: true)
.mqttVersion_3_1_1()
// broker address (default: localhost:1883)
.broker("hostname", 1883)
// if TLS should be enabled (default: false)
.useTls(true)
// Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
.perUserKeyManagerFactory { userId -> null as KeyManagerFactory? }
// clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
.clientId("#{id}")
// if session should be cleaned during connect (default: true)
.cleanSession(true)
// optional credentials for connecting
.credentials("#{userName}", "#{password}")
// connections keep alive timeout
.keepAlive(30)
// use at-most-once QoS (default: true)
.qosAtMostOnce()
// use at-least-once QoS (default: false)
.qosAtLeastOnce()
// use exactly-once QoS (default: false)
.qosExactlyOnce()
// enable retain (default: false)
.retain(false)
// send last will, possibly with specific QoS and retain
.lastWill(
LastWill("#{willTopic}", StringBody("#{willMessage}"))
.qosAtLeastOnce()
.retain(true)
)
// max number of reconnects after connection crash (default: 3)
.reconnectAttemptsMax(1)
// reconnect delay after connection crash in millis (default: 100)
.reconnectDelay(1)
// reconnect delay exponential backoff (default: 1.5)
.reconnectBackoffMultiplier(1.5f)
// resend delay after send failure in millis (default: 5000)
.resendDelay(1000)
// resend delay exponential backoff (default: 1.0)
.resendBackoffMultiplier(2.0f)
// interval for timeout checker (default: 1 second)
.timeoutCheckInterval(1)
// check for pairing messages sent and messages received
.correlateBy(null as CheckBuilder)
val mqttProtocol = mqtt
// enable protocol version 3.1 (default: false)
.mqttVersion_3_1
// enable protocol version 3.1.1 (default: true)
.mqttVersion_3_1_1
// broker address (default: localhost:1883)
.broker("hostname", 1883)
// if TLS should be enabled (default: false)
.useTls(true)
// Used to specify KeyManagerFactory for each individual virtual user. Input is the 0-based incremental id of the virtual user.
.perUserKeyManagerFactory(userId => null.asInstanceOf[javax.net.ssl.KeyManagerFactory])
// clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
.clientId("#{id}")
// if session should be cleaned during connect (default: true)
.cleanSession(true)
// optional credentials for connecting
.credentials("#{userName}", "#{password}")
// connections keep alive timeout
.keepAlive(30)
// use at-most-once QoS (default: true)
.qosAtMostOnce
// use at-least-once QoS (default: false)
.qosAtLeastOnce
// use exactly-once QoS (default: false)
.qosExactlyOnce
// enable retain (default: false)
.retain(false)
// send last will, possibly with specific QoS and retain
.lastWill(
LastWill("#{willTopic}", StringBody("#{willMessage}"))
.qosAtLeastOnce
.retain(true)
)
// max number of reconnects after connection crash (default: 3)
.reconnectAttemptsMax(1)
// reconnect delay after connection crash in millis (default: 100)
.reconnectDelay(1)
// reconnect delay exponential backoff (default: 1.5)
.reconnectBackoffMultiplier(1.5F)
// resend delay after send failure in millis (default: 5000)
.resendDelay(1000)
// resend delay exponential backoff (default: 1.0)
.resendBackoffMultiplier(2.0F)
// interval for timeout checker (default: 1 second)
.timeoutCheckInterval(1)
// check for pairing messages sent and messages received
.correlateBy(null)
Request
Use the mqtt("requestName")
method in order to create a MQTT request.
connect
Your virtual users first have to establish a connection.
mqtt("Connecting").connect();
mqtt("Connecting").connect()
mqtt("Connecting").connect
subscribe
Use the subscribe
method to subscribe to an MQTT topic:
mqtt("Subscribing")
.subscribe("#{myTopic}")
// optional, override default QoS
.qosAtMostOnce();
mqtt("Subscribing")
.subscribe("#{myTopic}") // optional, override default QoS
.qosAtMostOnce()
mqtt("Subscribing")
.subscribe("#{myTopic}")
// optional, override default QoS
.qosAtMostOnce
publish
Use the publish
method to publish a message. You can use the same Body
API as for HTTP request bodies:
mqtt("Publishing")
.publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"));
mqtt("Publishing")
.publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"))
mqtt("Publishing")
.publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"))
MQTT checks
You can define blocking checks with await
and non-blocking checks with expect
.
Those can be set right after subscribing, or after publishing:
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
.expect(Duration.ofMillis(100));
// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100));
// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100), "repub/#{myTopic}");
// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
.publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists());
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
.expect(Duration.ofMillis(100))
// publish and wait (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100))
// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100), "repub/#{myTopic}")
// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
.publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(Duration.ofMillis(100)).check(jsonPath("$.error").notExists())
// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("#{myTopic2}")
.expect(100.milliseconds)
// publish and await (block) until it receives a message withing 100ms
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(100.milliseconds)
// optionally, define in which topic the expected message will be received
mqtt("Publishing").publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(100.milliseconds, "repub/#{myTopic}")
// optionally define check criteria to be applied on the matching received message
mqtt("Publishing")
.publish("#{myTopic}").message(StringBody("#{myPayload}"))
.await(100.milliseconds).check(jsonPath("$.error").notExists)
You can optionally define in which topic the expected message will be received:
You can optionally define check criteria to be applied on the matching received message:
You can use waitForMessages
and block for all pending non-blocking checks:
waitForMessages().timeout(Duration.ofMillis(100));
waitForMessages().timeout(Duration.ofMillis(100))
waitForMessages.timeout(100.milliseconds)
Processing unmatched messages
You can use processUnmatchedMessages
to process inbound messages that haven’t been matched with a check and have been buffered.
By default, unmatched inbound messages are not buffered, you must enable this feature by setting the size of the buffer on the protocol with .unmatchedInboundMessageQueueSize(maxSize)
.
The buffer is reset when:
- sending an outbound message
- calling
processUnmatchedMessages
so we don’t present the same message twice
You can then pass your processing logic as a function.
The list of messages passed to this function is sorted in timestamp ascending (meaning older messages first).
It contains instances of type io.gatling.mqtt.action.MqttInboundMessage
.
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}", (messages, session) -> session.set("messages", messages));
// collect the last text message and store it in the Session
processUnmatchedMessages(
"#{myTopic}",
(messages, session) -> {
Collections.reverse(messages);
String lastTextMessage =
messages.stream()
.map(m -> m.payloadUtf8String())
.findFirst()
.orElse(null);
return lastTextMessage != null ?
session.set("lastTextMessage", lastTextMessage) :
session;
});
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}") { messages, session -> session.set("messages", messages) }
// collect the last text message and store it in the Session
processUnmatchedMessages("#{myTopic}") { messages, session ->
messages
.map { m -> m.payloadUtf8String() }
.takeLast(1)
.fold(session) { _, lastTextMessage ->
session.set("lastTextMessage", lastTextMessage)
}
}
// store the unmatched messages in the Session
processUnmatchedMessages("#{myTopic}") {
(messages, session) => session.set("messages", messages)
}
// collect the last text message and store it in the Session
processUnmatchedMessages("#{myTopic}") {
(messages, session) => {
val lastMessage = messages.reverseIterator.nextOption().map(_.payloadUtf8String)
lastMessage.fold(session)(m => session.set("lastTextMessage", m))
}
}
MQTT configuration
MQTT support honors the ssl and netty configurations from gatling.conf
.
Example
public class MqttSample extends Simulation {
MqttProtocolBuilder mqttProtocol = mqtt
.broker("localhost", 1883)
.correlateBy(jsonPath("$.correlationId"));
ScenarioBuilder scn = scenario("MQTT Test")
.feed(csv("topics-and-payloads.csv"))
.exec(mqtt("Connecting").connect())
.exec(mqtt("Subscribing").subscribe("#{myTopic}"))
.exec(mqtt("Publishing").publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"))
.expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()));
{
setUp(scn.injectOpen(rampUsersPerSec(10).to(1000).during(60)))
.protocols(mqttProtocol);
}
}
class MqttSample : Simulation() {
val mqttProtocol = mqtt
.broker("localhost", 1883)
.correlateBy(jsonPath("$.correlationId"))
val scn = scenario("MQTT Test")
.feed(csv("topics-and-payloads.csv"))
.exec(mqtt("Connecting").connect())
.exec(mqtt("Subscribing").subscribe("#{myTopic}"))
.exec(mqtt("Publishing").publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"))
.expect(Duration.ofMillis(100)).check(jsonPath("$.error").notExists()))
init {
setUp(scn.injectOpen(rampUsersPerSec(10.0).to(1000.0).during(60)))
.protocols(mqttProtocol)
}
}
class MqttSample extends Simulation {
val mqttProtocol = mqtt
.broker("localhost", 1883)
.correlateBy(jsonPath("$.correlationId"))
val scn = scenario("MQTT Test")
.feed(csv("topics-and-payloads.csv"))
.exec(mqtt("Connecting").connect)
.exec(mqtt("Subscribing").subscribe("#{myTopic}"))
.exec(mqtt("Publishing").publish("#{myTopic}")
.message(StringBody("#{myTextPayload}"))
.expect(100.milliseconds).check(jsonPath("$.error").notExists))
setUp(scn.inject(rampUsersPerSec(10) to 1000 during (60)))
.protocols(mqttProtocol)
}