experimental

Redis

Redis Streams transport for @miiajs/messaging - consumer groups, ZSET-backed retry, auto-DLQ.

@miiajs/messaging-redis is the production transport for @miiajs/messaging. It implements the same MessageTransport interface as the in-memory default, so user code does not change - only the transport factory passed to MessagingModule.configure. Reach for it as soon as you need persistence, multi-replica deployments, or crash recovery.

Under the hood it uses Redis Streams with consumer groups, a ZSET-backed retry scheduler, and atomic Lua scripts for ack/nack/DLQ transitions.

Installation

bun add @miiajs/messaging-redis ioredis

ioredis is a peer dependency - install it explicitly so your app pins the version. The transport is tested against ioredis@^5.

Setup

import { Module } from '@miiajs/core'
import { MessagingModule } from '@miiajs/messaging'
import { redisStreamsTransport } from '@miiajs/messaging-redis'

@Module({
  imports: [
    MessagingModule.configure({
      transport: redisStreamsTransport({
        url: 'redis://localhost:6379',
      }),
    }),
  ],
})
class AppModule {}

With ConfigService for env-driven URLs:

import { ConfigService } from '@miiajs/config'

MessagingModule.configure((resolve) => ({
  transport: redisStreamsTransport({
    url: resolve(ConfigService).getOrThrow('REDIS_URL'),
    retry: { maxAttempts: 10 },
  }),
}))

Everything else from the Overview page - @On, groups, retry/DLQ, MessageBus.publish - works identically.

redisStreamsTransport options

OptionTypeDefaultDescription
urlstring-Redis URL. Mutually exclusive with client.
clientRedis-Pre-built ioredis instance. The transport does not call connect() or quit() on a user-supplied client - see Pre-built ioredis client.
retryPartial<RetryConfig>DEFAULT_RETRYOverride individual retry fields (maxAttempts, backoffMs, backoffMultiplier, dlq).
retrySchedulerIntervalMsnumber1000How often the retry ZSET is drained back into the main stream.
reclaimIntervalMsnumber30000How often XAUTOCLAIM runs to recover pending entries from dead consumers.
minIdleMsnumber60000Minimum idle time before a pending entry is eligible for XAUTOCLAIM.
blockMsnumber5000XREADGROUP BLOCK timeout. Knob for idle Redis traffic and unsubscribe responsiveness only - does not affect publish or read latency thanks to the connection model.
drainTimeoutMsnumber5000Max time onDestroy() waits for in-flight handlers before closing the Redis client. See Graceful drain.
consumerNamestring${hostname}:${pid}:${rand8}Override for the auto-generated consumer name. Useful for diagnostics.

Connection model

The transport keeps one publisher client for XADD, XACK, all Lua commands (retrySchedule / drainRetry / moveToDlq), XGROUP CREATE, and the XAUTOCLAIM housekeeping loop. Every subscribe() call additionally creates its own duplicated client via pubClient.duplicate({ lazyConnect: true }) and runs its blocking XREADGROUP loop on it.

┌─────────────────────────────────────────────────────┐
│ pubClient            (1 socket)                     │
│   ├─ XADD            (publish)                      │
│   ├─ XACK            (per-message ack)              │
│   ├─ Lua scripts     (retrySchedule, moveToDlq)     │
│   └─ XAUTOCLAIM      (idle reclaim, drainRetry)     │
├─────────────────────────────────────────────────────┤
│ subClient #1         (1 socket per subscribe call)  │
│   └─ XREADGROUP BLOCK <ms>                          │
│ subClient #2                                        │
│   └─ XREADGROUP BLOCK <ms>                          │
│ ...                                                 │
└─────────────────────────────────────────────────────┘

Why split the connection: ioredis serializes commands on a single TCP socket, and XREADGROUP ... BLOCK <ms> holds the socket until data arrives or the timeout fires. Sharing one connection for publishing and blocking subscribers would queue every XADD behind the in-flight BLOCK, with publish latency growing linearly in subscriber count. Per-subscribe duplicates make publish latency constant (≈ network RTT), regardless of how many topics × groups you subscribe to. This is the same pattern BullMQ uses for its workers.

Connection count: 1 (publisher) + Σ handlers × max(1, sliding lane count). Each @On is its own subscription, so handler count is the multiplier - batch handlers add one connection each, sliding handlers add concurrency connections each. Replicas multiply this by replica count, but consumers within an auto-derived group share that group's broker round-robin (replicas don't multiply broker-side group count - just consumer count within each group). redisIdempotencyStore keeps its own client and adds to the count separately. SubClients are owned by the transport regardless of ownsClient - the ownsClient flag only governs the user-supplied parent client; duplicates are always created and closed by us.

Cost warning for managed Redis tiers. Combining bus-default sliding with many handlers is multiplicative. Example: 10 handlers and MessagingModule.configure({ dispatch: { mode: 'sliding', concurrency: 4 } }) produces 41 connections per replica (1 publisher + 10 × 4 lanes). Managed providers like Upstash and Redis Cloud price by connection count. Recommendation: leave the bus default at batch, opt into mode: 'sliding' only on individual handlers with variable runtime characteristics.

Broadcast group lifecycle

Each handler with broadcast: true derives its consumer group as <base>__<hostname>_<pid>. On graceful shutdown (onDestroy), the transport runs XGROUP DESTROY on those groups so they don't leak. For ungraceful exits (process crash, OOM kill), the next process to subscribe with broadcast on the same hostname scans XINFO GROUPS and destroys any matching <base>__<thishost>_<otherpid> groups - cleanup happens automatically on first restart.

Limitations: cleanup only matches same-hostname orphans. If a broadcast handler ran on host A, the host went away, and the same code starts on host B, the host-A groups remain orphan in Redis. Run a periodic cleanup script if your topology has dynamic hostnames.

Dispatch modes

Redis Streams supports both dispatch modes - 'batch' (default) and 'sliding'. The choice trades latency under variable handler runtimes against the number of Redis connections held open.

Batch (default)

A single consumer loop reads up to concurrency messages per XREADGROUP COUNT=concurrency call, runs them through Promise.allSettled, then reads again. The slowest handler in a batch holds back the next read - this is the head-of-line behavior.

subscription (1 connection)
  └─ XREADGROUP COUNT=4 BLOCK 5000  ──► [m0, m1, m2, m3]  ──► allSettled
                                                              │
                                                              └──► next XREADGROUP

Best for high-throughput uniform workloads where per-message variance is small. One connection per subscription regardless of concurrency.

Sliding

concurrency parallel lanes, each on its own duplicated Redis connection running XREADGROUP COUNT 1 with a unique consumer name suffix :laneN. Redis distributes pending entries across the lanes (since they all join the same group), so each lane only ever holds one in-flight message. A slow message on lane 0 does not block lanes 1-N.

subscription (4 connections, mode='sliding', concurrency=4)
  ├─ XREADGROUP COUNT=1 (consumer :lane0)  ──► m0  ──► next read
  ├─ XREADGROUP COUNT=1 (consumer :lane1)  ──► m1  ──► next read
  ├─ XREADGROUP COUNT=1 (consumer :lane2)  ──► m2  ──► next read
  └─ XREADGROUP COUNT=1 (consumer :lane3)  ──► m3  ──► next read

Best when handler runtimes vary and you cannot tolerate one slow message stalling the others. Mirrors the BullMQ worker pattern.

Picking a mode

BatchSliding
Connection budget per subscription1concurrency
Behavior under runtime skewhead-of-line blockingindependent lanes
Best fituniform workloads, high message volumemixed workloads, latency-sensitive paths
MessagingModule.configure({
  transport: redisStreamsTransport({ url: process.env.REDIS_URL! }),
  dispatch: { mode: 'sliding', concurrency: 4 }, // bus default
})

@On('order.placed', { mode: 'sliding', concurrency: 8 }) // per-handler override
async onOrder(order: Order) { ... }

Lane consumer names show up in XINFO CONSUMERS <stream> <group> as ${baseConsumer}:lane0, ${baseConsumer}:lane1 etc, which makes lane-level diagnostics straightforward.

Lane count equals concurrency in sliding mode - size it for the expected parallelism within a single replica, not for total cluster throughput. Add replicas to scale horizontally; raise sliding concurrency to soak up runtime variance per replica.

Consumer groups and deployment

Consumer groups are how Redis Streams maps logical roles to running replicas. The deployment story is straightforward:

  • One consumer group per logical role. email-workers, analytics, audit-log - pick names that describe the job, not the host.
  • Every replica of the same role joins the same group. Redis automatically load-balances messages across consumers within a group, so adding replicas scales throughput linearly until a single message is no longer the bottleneck.
  • Fan-out across groups is automatic. Two @On('user.created', { group: 'email-workers' }) handlers in one app + two @On('user.created', { group: 'analytics' }) handlers in a different app = both groups receive every message, independently.

The most common misconfig is mixing up consumer names and group names. Group names are shared across replicas (that's what enables load balancing). Consumer names must be unique per replica - the transport defaults to hostname:pid:rand8 which is safe by construction; only override consumerName if you have a reason. See Groups and fan-out on the Overview page for the full mental model.

Retry mechanics

Understanding what happens in Redis when a handler nacks helps when debugging or operating the cluster.

main stream ──► handler ──► ack ───► (done)
                  │
                  └─nack──► retry ZSET ──(delay)──► main stream
                               │
                               └─maxAttempts──► <topic>.dlq

Step by step:

  1. XADD <topic> writes the envelope as a stream entry.
  2. A consumer reads via XREADGROUP GROUP <group> <consumer> COUNT <n> BLOCK <ms> STREAMS <topic> >.
  3. On ack: a single XACK <topic> <group> <id> removes the entry from the group's pending entries list (PEL).
  4. On nack: an atomic Lua script runs XACK and ZADD <topic>:retry <retryAt> <new envelope> in one server-side step. The new envelope has meta.attempt incremented; the ZSET score is the absolute epoch ms when the retry becomes due.
  5. Every retrySchedulerIntervalMs, a background scheduler runs another atomic Lua script: ZRANGEBYSCORE <topic>:retry -inf <now> finds due entries, then for each: XADD <topic> republishes it and ZREM removes it from the ZSET.
  6. After maxAttempts, instead of going back to the retry ZSET, a third atomic Lua script runs XACK on the original entry plus XADD <topic>.dlq with meta.lastError populated.

All three transitions use Lua scripts (registered via ioredis defineCommand and cached server-side via EVALSHA) because each one couples two Redis operations that must succeed or fail together. Without atomicity, a process crash between, say, XACK and ZADD would leave the message acknowledged but never re-scheduled - silently lost.

Idle reclaim

Consumer groups track in-flight messages in the pending entries list (PEL). If a consumer dies while processing - process crash, container OOM, network partition - its pending entries stay in the PEL forever unless something reclaims them.

Every reclaimIntervalMs (default 30s), the transport runs XAUTOCLAIM <topic> <group> <my-consumer> <minIdleMs> on each active subscription. Any pending entry that has been idle for at least minIdleMs (default 60s) is transferred to the current consumer, which then processes it as if it were a fresh delivery. The retry counter and DLQ logic apply normally.

Worst-case recovery time after a consumer crash is minIdleMs + reclaimIntervalMs - about 90 seconds with the defaults. Shorten both for latency-sensitive workloads; lengthen them when reducing Redis traffic matters more than reaction speed.

Graceful drain

onDestroy() waits up to drainTimeoutMs for in-flight handlers to finish before closing the Redis client. Without drain, a SIGTERM mid-handler aborts the consumer loop and quits the client while a handler is still running - the message stays un-acked and reappears via XAUTOCLAIM after minIdleMs, producing a duplicate on the next consumer.

Drain order: abort each subscriptiondisconnect each subClient (forces in-flight XREADGROUP BLOCK to throw Connection is closed, the consumer loop sees the abort flag and exits cleanly) → await waitForDrain so handlers finish their final XACK / Lua calls through the still-alive pubClient → quit pubClient (only when the transport owns it).

Because subClients are forcibly disconnected on abort, shutdown is bounded by drainTimeoutMs regardless of blockMs. Size drainTimeoutMs to cover expected handler runtime; the previous "drainTimeoutMs >= blockMs + handler runtime" rule no longer applies.

DLQ writes during drain are safe - the moveToDlq Lua script is awaited synchronously inside the tracked processMessage promise, so the XADD to <topic>.dlq completes before the drain promise resolves.

redisIdempotencyStore

Production-ready IdempotencyStore backed by Redis. Pair with any MessageTransport (Redis Streams, NATS, in-memory, etc.) - the store is transport-agnostic.

import { Module } from '@miiajs/core'
import { MessagingModule } from '@miiajs/messaging'
import { redisIdempotencyStore, redisStreamsTransport } from '@miiajs/messaging-redis'

@Module({
  imports: [
    MessagingModule.configure({
      transport: redisStreamsTransport({ url: 'redis://localhost:6379' }),
      idempotency: redisIdempotencyStore({
        url: 'redis://localhost:6379',
        keyPrefix: 'orders-svc:idem:',
      }),
    }),
  ],
})
class AppModule {}

claim() is SET <prefix><id> 1 NX EX <ttl> - atomic, so concurrent claims for the same id produce exactly one true. release() is DEL <prefix><id>. Keys auto-expire after the TTL passed to @Idempotent.

OptionTypeDefaultDescription
urlstring-Redis URL. Mutually exclusive with client.
clientRedis-Pre-built ioredis instance. The store does not call connect() or quit() on a user-supplied client.
keyPrefixstring'miia:idem:'Key prefix in Redis. Set a per-service prefix when multiple services share a Redis instance so claims do not collide across services.
The Redis client used for idempotency can be the same ioredis instance you pass to redisStreamsTransport({ client }). Multiplexing one connection is fine - the Redis protocol is request/response and claim/release calls are short and non-blocking. The transport never blocks on this connection: blocking XREADGROUP runs on isolated per-subscribe duplicates, so sharing the parent client with the idempotency store (or any other subsystem) does not create publish-latency contention.

Pre-built ioredis client

When you want to share a connection with other parts of your app - caching, rate limiting, session storage - pass a pre-built Redis instance instead of a URL:

import { Redis } from 'ioredis'
import { redisStreamsTransport } from '@miiajs/messaging-redis'

const client = new Redis(process.env.REDIS_URL!)

MessagingModule.configure({
  transport: redisStreamsTransport({ client }),
})

The transport tracks an ownsClient flag internally:

  • url form: the transport creates the ioredis instance, calls connect() in onInit, and quit() in onDestroy. It owns the lifecycle of the parent client.
  • client form: the transport never calls connect() or quit() on your client. Your code owns the lifecycle. Initialise the connection before the app starts, and tear it down after app.destroy() returns.

In both cases, the transport calls client.duplicate({ lazyConnect: true }) for every subscribe() to spin up a dedicated blocking client (see Connection model). Those duplicates are owned by the transport regardless of ownsClient - they are created and closed by us, never by your code. The ownsClient flag only governs the parent.

This split exists so you can multiplex the parent connection across multiple subsystems (idempotency store, cache, rate limiter) without the transport reaching in and quitting it out from under you, while still getting per-subscribe blocking isolation for free.

Testing

Integration tests need a real Redis - ioredis-mock and similar libraries do not implement Streams faithfully enough for retry/DLQ semantics. The pattern used by the package's own test suite is to skip when REDIS_TEST_URL is unset:

import { describe, it, expect } from 'bun:test'
import { Redis } from 'ioredis'
import { RedisStreamsTransport } from '@miiajs/messaging-redis'
import { randomUUID } from 'node:crypto'

const REDIS_URL = process.env.REDIS_TEST_URL
const d = REDIS_URL ? describe : describe.skip

d('my events integration', () => {
  it('delivers across consumers', async () => {
    const transport = new RedisStreamsTransport({
      url: REDIS_URL!,
      retry: { backoffMs: 50 },
      blockMs: 200,
    })
    await transport.onInit()
    // ... use transport ...
    await transport.onDestroy()
  })
})

Two operational tips:

  • Always use unique topic names per test (UUID suffix is enough). Cross-test pollution from a leftover stream is the #1 source of flaky retry/DLQ assertions.
  • Lower blockMs to 200ms in tests so unsubscribe() and onDestroy() return promptly instead of waiting for the default 5s XREADGROUP BLOCK to time out.

Provisioning the Redis instance itself - local container, managed service, in-cluster pod - is up to you; the transport only needs a reachable URL exported as REDIS_TEST_URL before the suite runs.

Exports

import {
  RedisIdempotencyStore,
  redisIdempotencyStore,
  RedisStreamsTransport,
  redisStreamsTransport,
  type RedisIdempotencyStoreOptions,
  type RedisStreamsTransportOptions,
} from '@miiajs/messaging-redis'