SSE (Server Sent Event)
Learn the possible SSE operations with Gatling: connect, close.
SSE support is an extension to the HTTP DSL, whose entry point is the sse(requestName: Expression[String])
method.
sseName
If you want to deal with several SSE streams per virtual users, you have to give them a name and pass this name on each SSE operation: For example:
sse("Sse").sseName("myCustomName");
sse("Sse").sseName("myCustomName")
sse("Sse").sseName("myCustomName")
Of course, this step is not required if you deal with one single SSE stream per virtual user.
Connecting
The first thing is to connect the stream:
Gatling supports GET
and POST
requests:
exec(sse("Connect").get("/stocks/prices"));
exec(sse("Connect").post("/stocks/prices").body(StringBody("{\"foo\": \"bar\"}")));
exec(sse("Connect").get("/stocks/prices"))
exec(sse("Connect").post("/stocks/prices").body(StringBody("{\"foo\": \"bar\"}")))
exec(sse("Connect").get("/stocks/prices"))
exec(sse("Connect").post("/stocks/prices").body(StringBody("""{"foo": "bar"}""")))
Accept
header to text/event-stream
and Cache-Control
to no-cache
.close
Once you’re done with a SSE stream, you can close it.
exec(sse("Close").close());
exec(sse("Close").close())
exec(sse("Close").close)
Checks
You deal with incoming messages with checks.
Beware of not missing messages that would be received prior to setting the check.
Gatling currently only supports blocking checks that will wait until receiving expected message or timing out.
Set a check
You can set a check right after connecting:
exec(sse("Connect").get("/stocks/prices")
.await(5).on(sseCheck));
exec(sse("Connect").get("/stocks/prices")
.await(5).on(sseCheck))
exec(sse("Connect").get("/stocks/prices")
.await(5)(sseCheck))
Or you can set a check from main flow:
exec(sse("SetCheck").setCheck()
.await(30).on(sseCheck));
exec(sse("SetCheck").setCheck()
.await(30).on(sseCheck))
exec(sse("SetCheck").setCheck
.await(30)(sseCheck))
You can set multiple checks sequentially. Each one will expect one single frame.
You can configure multiple checks in a single sequence:
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck()
.await(30).on(sseCheck1, sseCheck2));
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck()
.await(30).on(sseCheck, sseCheck))
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck
.await(30)(sseCheck1, sseCheck2))
You can also configure multiple check sequences with different timeouts:
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck()
.await(15).on(sseCheck1)
.await(15).on(sseCheck2));
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck()
.await(15).on(sseCheck)
.await(15).on(sseCheck))
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck
.await(15)(sseCheck1)
.await(15)(sseCheck2))
Create a check
You can create checks for server events with checkMessage
.
You can use almost all the same check criteria as for HTTP requests.
SseMessageCheck sseCheck = sse.checkMessage("checkName")
.check(regex("event: snapshot(.*)"));
val sseCheck = sse.checkMessage("checkName")
.check(regex("event: snapshot(.*)"))
val sseCheck = sse.checkMessage("checkName")
.check(regex("event: snapshot(.*)"))
You can have multiple criteria for a given message:
sse.checkMessage("checkName")
.check(
regex("event: event1(.*)"),
regex("event: event2(.*)")
);
sse.checkMessage("checkName")
.check(
regex("event: event1(.*)"),
regex("event: event2(.*)")
)
sse.checkMessage("checkName")
.check(
regex("event: event1(.*)"),
regex("event: event2(.*)")
)
Matching messages
You can define matching
criteria to filter messages you want to check.
Matching criterion is a standard check, except it doesn’t take saveAs
.
Non-matching messages will be ignored.
exec(sse("SetCheck").setCheck()
.await(1).on(
sse.checkMessage("checkName")
.matching(substring("event"))
.check(regex("event: snapshot(.*)"))
));
exec(sse("SetCheck").setCheck()
.await(1).on(
sse.checkMessage("checkName")
.matching(substring("event"))
.check(regex("event: snapshot(.*)"))
))
exec(sse("SetCheck").setCheck
.await(1)(
sse.checkMessage("checkName")
.matching(substring("event"))
.check(regex("event: snapshot(.*)"))
))
Processing unmatched messages
You can use processUnmatchedMessages
to process inbound messages that haven’t been matched with a check and have been buffered.
By default, unmatched inbound messages are not buffered, you must enable this feature by setting the size of the buffer on the protocol with .sseUnmatchedInboundMessageQueueSize(maxSize)
.
The buffer is reset when:
- sending an outbound message
- calling
processUnmatchedMessages
so we don’t present the same message twice
You can then pass your processing logic as a function.
The list of messages passed to this function is sorted in timestamp ascending (meaning older messages first).
It contains instances of types io.gatling.http.action.sse.SseInboundMessage
.
exec(
// store the unmatched messages in the Session
sse.processUnmatchedMessages((messages, session) -> session.set("messages", messages))
);
exec(
// collect the last message and store it in the Session
sse.processUnmatchedMessages(
(messages, session) ->
!messages.isEmpty()
? session.set("lastMessage", messages.get(messages.size() - 1).message())
: session)
);
exec(
// store the unmatched messages in the Session
sse.processUnmatchedMessages { messages, session -> session.set("messages", messages) }
)
exec(
// collect the last message and store it in the Session
sse.processUnmatchedMessages { messages, session ->
if (messages.isNotEmpty()) session.set("lastMessage", messages.last().message())
else session
}
)
exec(
// store the unmatched messages in the Session
sse.processUnmatchedMessages((messages, session) => session.set("messages", messages))
)
exec(
// collect the last message and store it in the Session
sse.processUnmatchedMessages { (messages, session) =>
messages
.lastOption
.fold(session)(m => session.set("lastMessage", m.message))
}
)
Configuration
SSE support introduces new HttpProtocol parameters:
http
// enable unmatched SSE inbound messages buffering,
// with a max buffer size of 5
.sseUnmatchedInboundMessageBufferSize(5);
http
// enable unmatched SSE inbound messages buffering,
// with a max buffer size of 5
.sseUnmatchedInboundMessageBufferSize(5)
http
// enable unmatched SSE inbound messages buffering,
// with a max buffer size of 5
.sseUnmatchedInboundMessageBufferSize(5)
Debugging
You can inspect streams if you add the following logger to your logback configuration:
<logger name="io.gatling.http.action.sse.fsm" level="DEBUG" />
Example
Here’s an example that runs against a stock market sample:
ScenarioBuilder scn = scenario("ServerSentEvents")
.exec(
sse("Stocks").get("/stocks/prices")
.await(10).on(
sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
),
pause(15),
sse("Close").close()
);
val scn = scenario("ServerSentEvents")
.exec(
sse("Stocks").get("/stocks/prices")
.await(10).on(
sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
),
pause(15),
sse("Close").close()
)
val scn = scenario("ServerSentEvents")
.exec(
sse("Stocks").get("/stocks/prices")
.await(10)(
sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
),
pause(15),
sse("Close").close
)