Overview
@miiajs/messaging is a decorator-driven event bus for MiiaJS. Mark any method on any provider or controller with @On('topic'), inject MessageBus somewhere else, and call bus.publish(...) - handlers are wired automatically at startup. No central registry, no manual subscription list.
The package ships with an in-memory transport for development and tests. For production, swap in Redis Streams - same API, just one line in MessagingModule.configure. The discovery side is built on top of DiscoveryService and the onReady lifecycle hook, so the wiring story is the same as any other ambient-discovery package.
Installation
bun add @miiajs/messaging
npm install @miiajs/messaging
pnpm add @miiajs/messaging
yarn add @miiajs/messaging
No peer dependencies for the in-memory transport.
Setup
import { Module, Injectable, inject } from '@miiajs/core'
import { MessagingModule, MessageBus, On, inMemoryTransport } from '@miiajs/messaging'
@Injectable()
class EmailService {
@On('user.created')
async sendWelcome(user: { id: string; email: string }) {
console.log(`welcome → ${user.email}`)
}
}
@Injectable()
class SignupController {
private bus = inject(MessageBus)
async createUser(input: { email: string }) {
const user = { id: crypto.randomUUID(), ...input }
await this.bus.publish('user.created', user)
return user
}
}
@Module({
imports: [MessagingModule.configure({ transport: inMemoryTransport() })],
providers: [EmailService, SignupController],
})
class AppModule {}
@On works on providers and controllers - DiscoveryService finds both, so a controller method can react to an event the same way a service can. Handler signature is (payload, meta?) => void | Promise<void>; throwing nacks the message.
The @On decorator
@On(topic: string, options?: {
group?: string
concurrency?: number
mode?: 'batch' | 'sliding'
bus?: string
broadcast?: boolean
})
| Option | Type | Description |
|---|---|---|
group | string | Explicit broker group for a competing-consumers worker pool. Without it, an auto-derived per-handler group ${topic}__${Class}_${method} is used. See Subscriptions and fan-out. |
concurrency | number | Per-handler subscription concurrency. In 'batch' mode this is the XREADGROUP COUNT prefetch; in 'sliding' mode it is the number of parallel lanes. Ignored by the in-memory transport. |
mode | 'batch' | 'sliding' | Per-handler override of the dispatch mode. Defaults to the bus default, which defaults to the transport default. See Dispatch modes. |
bus | string | Target bus name for multi-bus setups. Omit to target the default bus. |
broadcast | boolean | Cluster-wide fan-out (every replica gets a copy). Mutually exclusive with group. See Subscriptions and fan-out. |
Discovery happens in the MessageBus.onReady() hook - see Discovery Service for how that scan works under the hood. You don't need to know the internals to use @On, but it's the same pattern you'd write for your own custom decorators.
Subscriptions and fan-out
Each @On becomes its own broker subscription with an auto-derived consumer group ${topic}__${ClassName}_${methodName} (or ${appName}:${topic}__... when MessagingModule.configure({ appName }) is set). Three layered behaviors result:
Within one process: fan-out across handlers (automatic)
Two handlers on the same topic = two subscriptions = each gets its own copy:
@Injectable()
class EmailService {
@On('user.created')
async sendWelcome(user: User) { /* always runs */ }
}
@Injectable()
class AnalyticsService {
@On('user.created')
async track(user: User) { /* always runs - independent group */ }
}
Each handler has its own retry/ack lifecycle - one throwing does not affect the other. To deduplicate side effects on retry, pair with @Idempotent.
Across replicas: load balance per handler (automatic)
When N replicas of the same code run, they share the auto-derived group:
publish('user.created', payload)
↓
group "user.created__EmailService_sendWelcome" (5 consumers, one per replica)
broker round-robin → ONE replica handles email exactly once cluster-wide
group "user.created__AnalyticsService_track" (5 consumers, one per replica)
broker round-robin → ONE replica handles analytics exactly once
Add replicas → throughput scales linearly per handler. No coordination needed.
Cluster-wide fan-out (broadcast: true)
Sometimes every replica must run the handler (cache invalidation, websocket broadcast). Set broadcast: true:
@On('cache.invalidate', { broadcast: true })
async refreshCache(key: string) { /* runs on every replica */ }
The auto-derived group is suffixed with __<hostname>_<pid>, so each process gets a unique broker group and the broker delivers a copy to every replica's subscription.
Worker pool: explicit group (competing consumers)
For competing-consumers semantics across multiple handler classes, pass group: 'pool-name':
@On('jobs', { group: 'job-pool' })
class FastWorker { ... }
@On('jobs', { group: 'job-pool' })
class SlowWorker { ... }
All handlers in job-pool join the same broker group; the broker round-robins each message to exactly one of them. Requires a transport with supportsCompetingConsumers: true (Redis Streams, Kafka, RabbitMQ, NATS). The in-memory transport rejects explicit group at startup.
Multi-service deployments (appName)
When several services share a broker, pass MessagingModule.configure({ appName: 'my-service' }). Auto-derived groups become my-service:topic__Class_method, so two services with overlapping handler class names do not collide. Topics and explicit group values are NOT prefixed - they remain shared cross-service contracts.
Dispatch modes
Where group controls fan-out between replicas, dispatch mode controls how a single subscription pulls messages off the broker:
'batch'- the consumer reads up toconcurrencymessages, runs them throughPromise.allSettled, then reads again. The slowest handler in a batch holds back the next read (head-of-line blocking). Best for high-throughput uniform workloads where per-message variance is small.'sliding'- each in-flight message progresses independently; the subscription pulls the next message as soon as a slot frees, without waiting for the rest of the batch. Best when handler runtimes vary - one slow message no longer blocks the others.
| Transport | Supported modes | Default |
|---|---|---|
InMemoryTransport | ['sliding'] | 'sliding' |
RedisStreamsTransport | ['batch', 'sliding'] | 'batch' |
In-memory has no broker prefetch primitive and concurrency is meaningless within a single process, so emulating batch would just add an artificial barrier with no value. Redis Streams supports both - sliding spawns concurrency parallel XREADGROUP COUNT 1 lanes on dedicated connections (BullMQ-style); batch is the single-loop path with COUNT=concurrency.
Resolution
Mode and concurrency resolve through three levels - more specific wins:
- Per-handler -
@On('topic', { mode, concurrency }). - Per-bus -
MessagingModule.configure({ dispatch: { mode, concurrency } }). - Transport default - whatever
transport.defaultModesays, with concurrency falling back to1.
MessagingModule.configure({
transport: redisStreamsTransport({ url: process.env.REDIS_URL! }),
dispatch: { mode: 'sliding', concurrency: 4 }, // default for handlers on this bus
})
@On('order.placed') // uses bus default: sliding/4
@On('audit.log', { mode: 'batch', concurrency: 32 }) // override: batch/32
Each @On is its own subscription with its own mode/concurrency - validation runs per handler at startup (unsupported mode against transport.supportedModes, concurrency <= 0). Different handlers on the same topic can have different mode/concurrency without conflict.
concurrency connections per handler). Batch mode opens one connection per handler regardless of concurrency. Pick sliding when handler runtimes vary; pick batch when uniform throughput matters more than per-message latency. See the Redis transport page for details.Retry and DLQ
When a handler throws (or returns { status: 'nack' } from a low-level transport-level call), the transport retries with exponential backoff. After maxAttempts failures, the message is republished to <topic>.dlq with the last error captured in meta.lastError.
Retries are scoped to the single handler that nacked. Sibling handlers on the same topic (each their own subscription) are unaffected - if EmailService.send throws while AnalyticsService.track succeeded, only the email handler retries; analytics is not re-run.
Retry delay between attempt N and N+1: backoffMs * backoffMultiplier^(attempt - 1). With the defaults (backoffMs: 1000, multiplier: 2) attempts fire at 1s, 2s, 4s, 8s, 16s.
Subscribing to a DLQ is no different from any other topic - there's no special API:
@Injectable()
class DlqAlerts {
@On('user.created.dlq')
async onPoisonPill(payload: unknown, meta: { lastError?: string; attempt: number }) {
await alertOps(`user.created exhausted: ${meta.lastError}`)
}
}
| Field | Type | Default | Description |
|---|---|---|---|
maxAttempts | number | 5 | Total delivery attempts before a message is moved to the DLQ. |
backoffMs | number | 1000 | Base delay (ms) for the first retry. |
backoffMultiplier | number | 2 | Exponential multiplier applied per attempt. |
dlq | boolean | true | When false, exhausted messages are dropped with an error log instead of routed to <topic>.dlq. |
The default is max-attempts + DLQ, not infinite retry, because infinite retry creates "poison pill" queues that silently block live traffic behind a single broken message - same reason Kafka, RabbitMQ, and NATS JetStream all standardise on bounded retry plus a dead-letter destination.
Distributed tracing
Every MessageEnvelope carries optional meta.traceparent and meta.tracestate fields that follow the W3C Trace Context spec. They are pure data - the bus does not auto-inject anything from your tracing library, but the fields are preserved across publish, retry, and DLQ so a span context populated at the producer survives the network hop to any consumer (Node.js, Go, Python - JSON envelopes work cross-language).
import { trace } from '@opentelemetry/api'
const span = trace.getActiveSpan()
const ctx = span?.spanContext()
await bus.publish('order.placed', order, {
traceparent: ctx
? `00-${ctx.traceId}-${ctx.spanId}-${ctx.traceFlags.toString(16).padStart(2, '0')}`
: undefined,
})
In a consumer, restore the span context with whatever tracing library you use:
@On('order.placed')
async onOrderPlaced(payload: Order, meta: MessageMeta) {
if (meta.traceparent) {
// restore span context via OpenTelemetry / Datadog / Sentry SDK
// continue the distributed trace inside this handler
}
// business logic
}
undefined to skip - do not pass an empty string. Empty strings serialize to a truthy-looking invalid value on the consumer side.Graceful drain
When app.destroy() runs (SIGTERM, deploy, restart), each transport waits up to drainTimeoutMs (default 5000) for in-flight handlers to finish before tearing down state. Without drain, a handler interrupted mid-execution leaves the message un-acked - the broker reclaims it and a duplicate fires after restart.
inMemoryTransport({
drainTimeoutMs: 5000, // default; set 0 to skip drain
})
The drain phase blocks new deliveries (including scheduled retries) and awaits all currently-running handlers. When the timeout is reached the transport logs a warning and continues with cleanup; pending handlers will keep running but their results are discarded.
terminationGracePeriodSeconds (30s by default). If your handlers do longer work, raise drainTimeoutMs accordingly. For the Redis transport see the drain note on the Redis page.Idempotency
@miiajs/messaging delivers messages at least once, so duplicates are inevitable - consumer crash before XACK, network blip, broker reclaim. Most handlers should be idempotent by design (UPDATE WHERE id = ?, cache invalidations, state syncs). For handlers with non-idempotent side effects - charging cards, sending emails, calling rate-limited external APIs - use @Idempotent to skip duplicate executions:
import { Injectable } from '@miiajs/core'
import { On, Idempotent } from '@miiajs/messaging'
@Injectable()
class PaymentService {
@On('order.placed')
@Idempotent({ ttl: 24 * 60 * 60 * 1000 }) // dedupe within 24h
async chargeCard(order: Order) {
await this.payments.charge(order.cardId, order.total)
}
}
Default key is ${envelope.id}:${ClassName}.${methodName} - per-handler scope, so two @Idempotent handlers on the same topic do not conflict. Override key to widen scope or to dedupe on a business identifier from the payload:
@On('payment.received')
@Idempotent({ ttl: 24 * 60 * 60 * 1000, key: (p: Payment) => `payment:${p.transactionId}` })
async onPayment(payment: Payment) { ... }
Configure the store in MessagingModule.configure:
import { MessagingModule, inMemoryTransport, memoryIdempotencyStore } from '@miiajs/messaging'
MessagingModule.configure({
transport: inMemoryTransport(),
idempotency: memoryIdempotencyStore(), // dev / single-process
})
For production with multiple replicas, use redisIdempotencyStore - claims must be visible across processes, which an in-memory Map cannot provide.
| Decorator option | Type | Default | Description |
|---|---|---|---|
ttl | number | required | Claim lifetime in milliseconds. After expiry, the same key can be claimed and processed again. |
key | (payload, meta) => string | ${envelope.id}:${ClassName}.${methodName} | Custom claim key. Useful when the upstream may republish the same business event with different envelope IDs, or when handlers should share a claim. |
@Idempotent is not "exactly-once". If the consumer crashes after claim() but before the message is acked back to the broker, the claim stays in the store while the broker still considers the message un-acked. After redelivery, the next consumer sees a stale claim and skips the handler, effectively losing the message. For business-critical workflows pair @Idempotent with a transactional outbox or use idempotent-by-design handlers (UPDATE WHERE id = ?).If any handler is annotated with @Idempotent but no idempotency store is configured, MessageBus throws at startup so the misconfiguration cannot reach production silently.
Multiple buses
For apps that need more than one transport - Kafka for legacy reads + Redis Streams for new writes, an internal in-memory bus + an external Redis bus, separate buses per tenant - call MessagingModule.configure(opts, name) once per bus. The same name namespace differentiates DI tokens, so each bus gets its own transport, idempotency store, and MessageBus instance.
import { Module } from '@miiajs/core'
import { MessagingModule, getMessageBusToken, inMemoryTransport } from '@miiajs/messaging'
import { redisStreamsTransport, redisIdempotencyStore } from '@miiajs/messaging-redis'
@Module({
imports: [
// Default bus - used when @On has no `bus` option
MessagingModule.configure({
transport: redisStreamsTransport({ url: process.env.REDIS_URL! }),
idempotency: redisIdempotencyStore({ keyPrefix: 'primary:idem:' }),
}),
// Named bus for legacy Kafka reads
MessagingModule.configure(
{ transport: kafkaTransport({ /* ... */ }) },
'kafka',
),
// Internal in-memory bus for domain events between services in the monolith
MessagingModule.configure({ transport: inMemoryTransport() }, 'internal'),
],
})
class AppModule {}
Inject the bus you need:
@Injectable()
class OrderService {
private primary = inject(MessageBus) // default
private kafka = inject<MessageBus>(getMessageBusToken('kafka') as string) // named
private internal = inject<MessageBus>(getMessageBusToken('internal') as string)
}
Target a specific bus from @On via the bus option:
@Injectable()
class Handlers {
@On('order.placed') // default bus
async onOrder(o: Order) { ... }
@On('legacy.user.created', { bus: 'kafka' }) // kafka bus
async onLegacyUser(u: User) { ... }
@On('domain.cart.updated', { bus: 'internal' })
async onCart(c: Cart) { ... }
}
@On('topic', { bus: 'unknown' }) throws at startup if no MessagingModule.configure(opts, 'unknown') was registered. Same for @On('topic') (no bus) when only named buses are configured (default bus must exist for handlers without bus). Fail-fast prevents silent typos like bus: 'kafa' from masking dead handlers.'primary.user.created' vs 'kafka.user.created'). Otherwise the buses' DLQ topics (<topic>.dlq) collide and DLQ-listeners on one bus pick up dead messages from the other. Easier alternative - use separate Redis instances per bus.IdempotencyStore instance to multiple configure() calls if you want claims to be shared across buses (rare but legitimate). The store's onDestroy() will be called once per bus during shutdown - the built-in stores tolerate this, but custom implementations should be idempotent.MessagingModule.configure
MessagingModule.configure() returns a ConfiguredModule with the transport and MessageBus registered as singletons. Two forms - same shape as JwtModule.configure and DrizzleModule.configure.
Static options:
MessagingModule.configure({
transport: inMemoryTransport(),
})
Factory with DI access (use this when the transport depends on ConfigService or another provider):
import { ConfigService } from '@miiajs/config'
import { redisStreamsTransport } from '@miiajs/messaging-redis'
MessagingModule.configure((resolve) => ({
transport: redisStreamsTransport({
url: resolve(ConfigService).getOrThrow('REDIS_URL'),
retry: { maxAttempts: 10 },
}),
}))
| Field | Type | Description |
|---|---|---|
transport | MessageTransport | Required. The transport instance the bus publishes through and subscribes against. |
idempotency | IdempotencyStore | Optional. Required when any handler uses @Idempotent - the bus throws at startup otherwise. See Idempotency. |
dispatch | { mode?: DispatchMode; concurrency?: number } | Optional. Bus-wide dispatch defaults applied to every subscription on this bus. Overridden per-handler via @On({ mode, concurrency }). See Dispatch modes. |
appName | string | Optional namespace prefix for auto-derived consumer groups. Use to avoid collisions when multiple services share a broker. Does NOT affect topic names or explicit @On({ group }) values. See Multi-service deployments. |
MessagingModule.configure() accepts an optional second argument name?: string for multi-bus setups. Without name - registers the default bus (MESSAGE_TRANSPORT, IDEMPOTENCY_STORE, MessageBus class tokens). With name - registers under namespaced tokens accessible via getMessageBusToken(name) etc.
InMemoryTransport options
inMemoryTransport({
retry: { backoffMs: 100, maxAttempts: 3 },
cloneOnPublish: false,
})
| Option | Type | Default | Description |
|---|---|---|---|
retry | Partial<RetryConfig> | DEFAULT_RETRY | Override individual retry fields. |
cloneOnPublish | boolean | false | When true, payload is structuredCloned before each handler sees it - prevents accidental cross-handler mutation at the cost of one clone per delivery. |
drainTimeoutMs | number | 5000 | Max time onDestroy() waits for in-flight handlers before forcing cleanup. Set 0 to skip drain. See Graceful drain. |
The in-memory transport is not persistent. Messages live in memory only; a process crash mid-retry loses any in-flight or scheduled retries. It's ideal for tests, local dev, and single-process apps where event loss on crash is acceptable. For production reach for the Redis transport.
Testing
Use TestApp to register the events module and resolve MessageBus directly:
import { TestApp } from '@miiajs/core/testing'
import { MessageBus, MessagingModule, inMemoryTransport } from '@miiajs/messaging'
const app = await TestApp.create(AppModule)
.provide({
token: MessagingModule,
factory: () => MessagingModule.configure({ transport: inMemoryTransport({ retry: { backoffMs: 5 } }) }),
})
.compile()
const bus = app.resolve(MessageBus)
await bus.publish('user.created', { id: '1' })
// In-memory delivery is asynchronous - handlers run via queueMicrotask.
// Wait a tick before asserting on the result.
await new Promise((r) => setTimeout(r, 10))
expect(handler.received).toEqual([{ id: '1' }])
Lower backoffMs to single-digit milliseconds in tests so retry paths complete inside the test timeout.
Exports
import {
MessageBus,
MessagingModule,
Idempotent,
IDEMPOTENT,
IDEMPOTENCY_STORE,
InMemoryTransport,
inMemoryTransport,
MemoryIdempotencyStore,
memoryIdempotencyStore,
On,
ON,
MESSAGE_TRANSPORT,
DEFAULT_RETRY,
dlqTopic,
getMessageBusToken,
getMessageTransportToken,
getIdempotencyStoreToken,
nextBackoffMs,
deriveGroupName,
type DeriveGroupNameInput,
type DispatchDefaults,
type DispatchMode,
type MessageEnvelope,
type MessageMeta,
type MessageTransport,
type MessagingModuleOptions,
type HandlerResult,
type IdempotencyStore,
type IdempotentMeta,
type InMemoryTransportOptions,
type MemoryIdempotencyStoreOptions,
type OnMeta,
type RetryConfig,
type SubscribeOptions,
type Subscription,
} from '@miiajs/messaging'