Observers

Stream API

Subscribe to wide events flowing through evlog, in-process, with createStreamDrain — sync listeners, async iterators, and a ring buffer.

createStreamDrain() exposes the events flowing through evlog as an in-process pub/sub. It's the primitive any local consumer can subscribe to without re-implementing a drain.

Scope: the stream lives inside a single Node / Bun / Deno process. It sees events emitted from that process only.That means it works during local development, on long-lived self-hosted servers, and inside containers. On serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda…), each invocation is a separate isolate, so a subscriber in one invocation will not see events emitted from another. Use a real broker for cross-instance fan-out.
import { createStreamDrain } from 'evlog/stream'

const stream = createStreamDrain({ buffer: 200 })

// Register as a normal evlog drain (Nitro hook or plugin runner):
nitroApp.hooks.hook('evlog:drain', stream.drain)

Subscribing

Two consumption styles are supported.

Sync listener

const unsubscribe = stream.subscribe((event) => {
  if (event.level === 'error') notifyOps(event)
})

// Later:
unsubscribe()

Listener errors are caught and logged — they never affect other subscribers or the drain.

Async iterator

for await (const event of stream.events()) {
  console.log(event.timestamp, event.action ?? event.message)
  if (shouldStop(event)) break  // breaking cleanly unsubscribes
}

Each call to events() returns a fresh independent iterator. Past buffered events are not replayed; pair with stream.recent() to seed history.

Replay buffer

stream.recent() returns a defensive copy of the most recent events (oldest first). The default buffer holds 500 events; pass buffer: 0 to disable, or set it explicitly:

const stream = createStreamDrain({ buffer: 1000 })

const initial = stream.recent()
for (const past of initial) seedDashboard(past)

stream.subscribe(liveEvent => updateDashboard(liveEvent))

Backpressure

A slow async-iterator consumer never blocks the drain. Each iterator has a per-subscriber queue (default 1000); when it overflows, the oldest queued events are dropped and stream.droppedCount increments.

Filter

Events that fail the optional filter predicate are not buffered nor delivered:

const errors = createStreamDrain({
  filter: event => event.level === 'error' || event.status >= 500,
})

Default singleton

When several pieces of code in the same process need to share a single stream — typically a framework integration that wires the drain on one side and the stream server on the other — use the singleton accessors:

import { getDefaultStream, setDefaultStream } from 'evlog/stream'

// Lazily creates a singleton on first call
const stream = getDefaultStream({ buffer: 500 })

// Reset (mostly useful in tests)
setDefaultStream(null)

The mini stream server uses this singleton internally, so anything draining into getDefaultStream() automatically reaches all SSE clients.

Going further

  • Network bridge — expose this stream over HTTP for browser tabs / CLIs / external devtools. See the Stream server.
  • Recipes — concrete consumer patterns (devtool, replay-then-live, aggregation). See Consumer recipes.