Message Bus (CQRS)¶
Introduction¶
CQRS (Command Query Responsibility Segregation) separates read and write operations into distinct models:
- Commands change state and optionally return a result.
- Queries read state without side effects.
- Events (notifications) broadcast that something happened — zero or more handlers react.
The Message Bus decouples the sender of a request from the handler that processes it. Instead of calling a handler directly, you pass a request object to the message bus, which looks up the correct handler and dispatches it through a pipeline of cross-cutting behaviors.
graph LR
Caller -->|invoke| ISender
ISender -->|dispatch| Pipeline[Pipeline Behaviors]
Pipeline --> Handler[Request Handler]
Handler -->|response| ISender
ISender -->|result| Caller
Caller2[Caller] -->|publish| IPublisher
IPublisher -->|fan-out| H1[Event Handler A]
IPublisher -->|fan-out| H2[Event Handler B]
Tip
ISender, IPublisher, and IMessageBus all resolve to the same message bus instance.
Inject only the interface you need — see Interfaces below.
waku's CQRS implementation is inspired by MediatR (.NET) and integrates with the module system, dependency injection, and extension lifecycle.
Setup¶
Import MessagingModule as a dynamic module in your root module:
MessagingConfig¶
| Option | Type | Default | Description |
|---|---|---|---|
message_bus_implementation_type |
type[IMessageBus] |
MessageBus |
Concrete message bus class for request/event dispatching |
event_publisher |
type[EventPublisher] |
SequentialEventPublisher |
Strategy for dispatching events to handlers |
pipeline_behaviors |
Sequence[type[IPipelineBehavior]] |
() |
Global pipeline behaviors applied to every request |
Passing None (or no argument) to MessagingModule.register() uses the defaults:
MessagingModule is registered as a global module — its providers (message bus, event publisher,
registry) are available to every module in the application without explicit imports.
Interfaces¶
waku provides three message bus interfaces at different levels of access. Inject only the interface you need to enforce the principle of least privilege:
| Interface | Methods | Use when |
|---|---|---|
IMessageBus |
invoke() + publish() |
The component both sends requests and publishes events |
ISender |
invoke() |
The component only dispatches commands/queries |
IPublisher |
publish() |
The component only broadcasts events |
IMessageBus extends both ISender and IPublisher:
All three interfaces are automatically registered in the DI container by MessagingModule.
dishka resolves ISender and IPublisher to the same MessageBus instance as IMessageBus.
Complete Example¶
An order placement flow with a command handler and two event handlers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | |
Fluent chaining
MessagingExtension().bind_request(...) and .bind_event(...) return Self, so you can
chain multiple bindings in a single expression.
Exceptions¶
| Exception | Raised when |
|---|---|
RequestHandlerNotFound |
bus.invoke() is called for a request type with no registered handler |
RequestHandlerAlreadyRegistered |
A second handler is bound to a request type that already has one |
EventHandlerAlreadyRegistered |
The same handler class is bound to the same event type twice |
PipelineBehaviorAlreadyRegistered |
The same behavior class is bound to the same request type twice |
Next steps¶
| Topic | Description |
|---|---|
| Requests | Commands, queries, and request handlers |
| Events | Event definitions, handlers, and publishers |
| Pipeline Behaviors | Cross-cutting middleware for request handling |
Further reading¶
- Event Sourcing — event-sourced aggregates, deciders, and projections
- Extension System — lifecycle hooks for application and module lifecycle
- Validation — startup validation and custom rules
- Testing — test utilities and provider overrides