hereticles

hereticles

heresy, ticles, and

21 Jun 2026

Testing SSE in zig When EVERYTHING Blocks

I rely on TDD heavily and really don’t like manual testing. So when I started building SSE support into mantel , I wanted a proper e2e test — connect a client, read the events, assert on them.

Except, I ran into a problem. http.zig ’s test helpers don’t work with SSE - they expect a request/response cycle that completes. And zig 0.16’s new Io framework supports neither cancellation nor non-blocking reads .

So: everything blocks, and there’s no way to stop it.

If I wasn’t building this because it’s fun, I’d probably skip it. Actually, I’d just use go where testing this is easy.

I am not doing this for work, and this kind of “threading the needle” solution finding is something I enjoy. Plus, I am writing zig because I love it.

I’d actually decided to let it be and moved on, but in my sleep, a solution came to me, and it worked.

If We Could Cancel

The normal testing pattern for this, for example in Go, would be something like:

- Set up the server
- Connect the Client with a timeout
- Read from the client

Here is a version of it from henge .

1
2
3
4
5
6
7
8
9
testServer := httptest.NewServer(router)
defer testServer.Close()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
scanner := connectToDevice(ctx, testServer)
// we don't care about the initial event
scanner.Scan() // blocks until first event arrives
// ...

You could do something similar if it didn’t support timeout, but did support non-blocking reads. You just wait until your timeout and see if the data was there.

Unfortunately, on zig, std.Io.Evented — the non-blocking backend — is still a work in progress."

(Un)Blocking

The idea that came to me was simply that we could close the server stream before we read as the client. In other words:

- Start the Server in a new thread
- Connect Client, but don't read
- Close the server
- Read all bytes from client
  - Safe now because stream has been closed

Server.zig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
test "e2e test" {
    const io = std.testing.io;
    const allocator = std.testing.allocator;

    var server = try init(io, allocator);
    defer server.deinit(allocator);

    var router = try server.https_srv.router(.{});
    router.get("/feed", handleFeed, .{});

    // start server async
    const thread = try server.https_srv.listenInNewThread();
    // The order of the following two lines is crucial
    defer thread.join();
    defer server.https_srv.stop();

    const conf_address = server.https_srv.config.address.ip.ip4;
    try httpz.testing.waitForPort(conf_address.port);

    const address = std.Io.net.IpAddress.parse("127.0.0.1", conf_address.port) catch unreachable;
    const stream = try address.connect(io, .{ .mode = .stream });
    defer stream.close(io);

    // send HTTP request first
    var write_buf: [256]u8 = undefined;
    var writer = stream.writer(io, &write_buf);
    const w = &writer.interface;
    w.writeAll("GET /feed HTTP/1.1\r\nHost: localhost\r\n\r\n") catch unreachable;
    w.flush() catch unreachable;

    // wait for feed to start
    while (!server.ctx.feed.active) {
        std.Io.sleep(io, std.Io.Duration.fromMilliseconds(100), std.Io.Clock.real) catch unreachable;
    }

    try server.ctx.feed.stopFeed();

    var read_buf: [1024]u8 = undefined;
    var reader = stream.reader(io, &read_buf);

    // drain the stream
    var body: std.ArrayList(u8) = .empty;
    defer body.deinit(allocator);
    try reader.interface.appendRemaining(allocator, &body, .unlimited);

    try std.testing.expect(std.mem.indexOf(u8, body.items, "a message") != null);
}

And in Feed.zig , we need to ensure the stream is closed on de-activating the feed:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
fn handle(feed: *Feed, stream: std.Io.net.Stream) void {
    var buf: [4096]u8 = undefined;
    var writer = stream.writer(feed.io, &buf);
    const w = &writer.interface;
    w.writeAll("a message\n") catch unreachable;
    w.flush() catch unreachable;

    while (feed.active) {
        std.Io.sleep(feed.io, std.Io.Duration.fromMilliseconds(100), std.Io.Clock.real) catch unreachable;
    }

    stream.close(feed.io);
}

Feed.zig also contains details about the active flag and how it’s managed

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
pub fn startFeed(feed: *Feed, res: *httpz.Response) !void {
    try feed.mutex.lock(feed.io);
    defer feed.mutex.unlock(feed.io);

    if (!feed.active) {
        try res.startEventStream(feed, Feed.handle);
        feed.active = true;
    }
}

pub fn stopFeed(feed: *Feed) !void {
    try feed.mutex.lock(feed.io);
    defer feed.mutex.unlock(feed.io);

    feed.active = false;
}

In this test, we are just verifying we get back “a message” which is just a placeholder. The intent is to prove the mechanism. The next step is to write an actual event and verify it.

Gotchas

There are many gotchas in this solution, particularly with ordering.

The server has to be closed before we join the thread, otherwise, it’ll just block.

We have to read all the content from the stream in one go with appendRemaining, otherwise, it’ll block. We could read in parts if we know there is a minimum of that much content left in the stream — but that’s tricky at best.

We should wait until the Feed is active before de-activating it, or you’ll end up with race conditions.

You must drain the stream fully, otherwise you’ll get errors.

Why Not Just Mock It?

I could mock it, and I will add some mocked tests later. I wanted some confidence of the end to end journey working well without having to run it and test with curl manually.

Once I replace the placeholder with actual events, I’ll not feel compelled to test it manually each time to ensure it still works.