Signal Bus¶
The signal bus is the core of Cadence. It coordinates signal routing, middleware, persistence, and handler execution.
Creating a Bus¶
import { createSignalBus, type DefineSignals } from "@peleke.s/cadence";
type MySignals = DefineSignals<{
"file.changed": { path: string };
"task.created": { title: string; assignee: string };
}>;
const bus = createSignalBus<MySignals>();
All options are optional — defaults provide in-memory transport, no persistence, and sequential execution:
import {
createSignalBus,
createMemoryTransport,
createNoopStore,
createSequentialExecutor,
} from "@peleke.s/cadence";
// These are the defaults (shown explicitly)
const bus = createSignalBus<MySignals>({
transport: createMemoryTransport(),
store: createNoopStore(),
executor: createSequentialExecutor(),
onError: (signal, handlerName, error) => {
console.error(`Handler ${handlerName} failed on ${signal.type}:`, error);
},
});
Subscribing to Signals¶
Type-Specific Handlers¶
Subscribe to a specific signal type. The handler receives only signals matching that type, with full type narrowing:
bus.on("file.changed", async (signal) => {
// signal.payload is { path: string } — fully typed
console.log(signal.payload.path);
});
The return value is an unsubscribe function:
const unsub = bus.on("task.created", handler);
// Later:
unsub();
Any Handlers¶
Subscribe to all signals regardless of type:
bus.onAny(async (signal) => {
console.log(`[${signal.type}] ${JSON.stringify(signal.payload)}`);
});
Emitting Signals¶
await bus.emit({
type: "task.created",
ts: Date.now(),
id: crypto.randomUUID(),
payload: { title: "Review PR", assignee: "alice" },
});
The emit flow:
- Signal saved to store (for durability)
- Signal dispatched through transport
- Middleware chain runs (if any)
- Type-specific handlers execute
- Any-handlers execute
- Signal marked as acknowledged in store
Middleware¶
Middleware runs before handlers. It receives the signal and a next() function:
// Logging middleware
bus.use(async (signal, next) => {
console.log(`→ ${signal.type}`);
await next();
console.log(`← ${signal.type}`);
});
// Filtering middleware (skip signals by returning without calling next)
bus.use(async (signal, next) => {
if (signal.type === "file.changed" && signal.payload.path.endsWith(".tmp")) {
return; // Drop temp file signals
}
await next();
});
// Enrichment middleware
bus.use(async (signal, next) => {
signal.source = signal.source ?? "default";
await next();
});
Middleware runs in registration order. The last middleware's next() invokes the handlers.
Signal Replay¶
If you provide a store that persists signals, you can replay unacknowledged signals on restart:
const replayed = await bus.replay();
console.log(`Replayed ${replayed} signals`);
This fetches all unacked signals from the store, re-emits them through the transport, and marks them as acknowledged.
Statistics¶
Monitor bus activity:
const stats = bus.stats();
// {
// emitted: 42,
// handled: 38,
// errors: 2,
// handlers: 3,
// anyHandlers: 1,
// middleware: 2,
// }
Error Handling¶
Handler errors are caught by the bus and passed to the onError callback. They do not crash the bus or prevent other handlers from running:
const bus = createSignalBus<MySignals>({
onError: (signal, handlerName, error) => {
// handlerName is "type:file.changed" or "any:0"
console.error(`Error in ${handlerName}:`, error);
},
});
Cleanup¶
Remove all handlers and middleware:
bus.clear();
See Also¶
- Core Concepts — architecture overview
- Pluggable Layers — custom transport, store, executor
- Types Reference — full interface definitions