Skip to content

Testing

waku provides testing utilities for event-sourced systems at two levels:

  • Unit testingDeciderSpec and AggregateSpec offer fluent Given/When/Then APIs for testing business logic without infrastructure.
  • Integration testingInMemoryEventStore, wait_for_projection(), and create_test_app() for end-to-end flows without a database.

DeciderSpec DSL

DeciderSpec provides a fluent Given/When/Then API for testing functional IDecider implementations.

The basic chain is:

DeciderSpec.for_(decider).given([events]).when(command).then([expected_events])
from app.decider import (
    BankAccountDecider,
    DepositMoney,
    OpenAccount,
)
from app.events import AccountOpened, MoneyDeposited
from waku.eventsourcing.testing import DeciderSpec


def test_open_account() -> None:
    decider = BankAccountDecider()

    (
        DeciderSpec
        .for_(decider)
        .given([])
        .when(OpenAccount(account_id='acc-1', owner='dex'))
        .then([AccountOpened(account_id='acc-1', owner='dex')])
    )


def test_deposit_updates_balance() -> None:
    decider = BankAccountDecider()

    (
        DeciderSpec
        .for_(decider)
        .given([AccountOpened(account_id='acc-1', owner='dex')])
        .when(DepositMoney(account_id='acc-1', amount=500))
        .then([MoneyDeposited(account_id='acc-1', amount=500)])
    )


def test_deposit_negative_raises() -> None:
    decider = BankAccountDecider()

    (
        DeciderSpec
        .for_(decider)
        .given([AccountOpened(account_id='acc-1', owner='dex')])
        .when(DepositMoney(account_id='acc-1', amount=-10))
        .then_raises(ValueError, match='Deposit amount must be positive')
    )


def test_state_after_events() -> None:
    decider = BankAccountDecider()

    (
        DeciderSpec
        .for_(decider)
        .given([
            AccountOpened(account_id='acc-1', owner='dex'),
            MoneyDeposited(account_id='acc-1', amount=500),
        ])
        .when(DepositMoney(account_id='acc-1', amount=300))
        .then_state(lambda s: s.balance == 800)
    )

DeciderSpec Methods

These methods set up the test scenario. given() is optional — omit it to test from initial state.

Method Parameters Returns Description
for_ decider: IDecider[S, C, E] DeciderSpec[S, C, E] Class method. Create a spec for the given decider
given events: Sequence[E] DeciderSpec[S, C, E] Apply prior events to build up state before the command
when command: C _DeciderWhenResult[S, C, E] Execute a command against the built-up state
then_state predicate: Callable[[S], bool] None Assert state built from given() events alone (no command)

Assertions After .when(command)

Available after .when(command):

Method Parameters Returns Description
then expected_events: Sequence[E] None Assert the command produced exactly these events
then_no_events None Assert the command produced zero events
then_raises exception_type: type[Exception], match: str | None = None None Assert the command raises this exception. match is a regex passed to pytest.raises
then_state predicate: Callable[[S], Any] None Assert the state after applying produced events matches the predicate
resulting_state S Property. Returns the state after deciding and evolving — use for custom assertions

Tip

then_state appears on both DeciderSpec and the result of .when(). On DeciderSpec it checks state from events alone (no command). After .when() it checks state after the command's produced events are applied.

AggregateSpec DSL

AggregateSpec provides the same Given/When/Then API for OOP EventSourcedAggregate classes. Since aggregate commands are methods rather than data objects, actions are expressed as lambdas:

AggregateSpec.for_(MyAggregate).given([events]).when(lambda agg: agg.do_something()).then([expected_events])
from app.aggregate import BankAccount
from app.events import AccountOpened, MoneyDeposited
from waku.eventsourcing.testing import AggregateSpec


def test_deposit_produces_event() -> None:
    (
        AggregateSpec
        .for_(BankAccount)
        .given([AccountOpened(account_id='acc-1', owner='dex')])
        .when(lambda acc: acc.deposit('acc-1', 500))
        .then([MoneyDeposited(account_id='acc-1', amount=500)])
    )


def test_withdraw_insufficient_funds_raises() -> None:
    (
        AggregateSpec
        .for_(BankAccount)
        .given([AccountOpened(account_id='acc-1', owner='dex')])
        .when(lambda acc: acc.withdraw('acc-1', 9999))
        .then_raises(ValueError, match='Insufficient funds')
    )


def test_balance_after_deposits() -> None:
    (
        AggregateSpec
        .for_(BankAccount)
        .given([
            AccountOpened(account_id='acc-1', owner='dex'),
            MoneyDeposited(account_id='acc-1', amount=500),
            MoneyDeposited(account_id='acc-1', amount=300),
        ])
        .then_state(lambda acc: acc.balance == 800)
    )


def test_no_op_produces_no_events() -> None:
    (
        AggregateSpec
        .for_(BankAccount)
        .given([AccountOpened(account_id='acc-1', owner='dex')])
        .when(lambda acc: acc.noop())
        .then_no_events()
    )

AggregateSpec Methods

Method Parameters Returns Description
for_ aggregate_type: type[A] AggregateSpec[A] Class method. Create a spec for the given aggregate type
given events: Sequence[INotification] AggregateSpec[A] Replay prior events via load_from_history()
when action: Callable[[A], None] _AggregateWhenResult[A] Execute an action (lambda) against the hydrated aggregate
then_state predicate: Callable[[A], Any] None Assert state built from given() events alone (no action)

Assertions After .when(action)

Method Parameters Returns Description
then expected_events: Sequence[INotification] None Assert the action produced exactly these events
then_no_events None Assert the action produced zero events
then_raises exception_type: type[Exception], match: str | None = None None Assert the action raises this exception
then_state predicate: Callable[[A], Any] None Assert aggregate state after the action and produced events

Manual Aggregate Testing

You can also test aggregates directly without AggregateSpec: create the aggregate, optionally call load_from_history() to set up prior state, invoke a command method, then assert collect_events() and state.

from app.aggregate import BankAccount
from app.events import AccountOpened, MoneyDeposited


def test_aggregate_opens_account() -> None:
    account = BankAccount()
    account.open('acc-1', 'dex')

    events = account.collect_events()
    assert len(events) == 1
    assert isinstance(events[0], AccountOpened)
    assert events[0].owner == 'dex'


def test_aggregate_deposits_money() -> None:
    account = BankAccount()
    account.load_from_history(
        [AccountOpened(account_id='acc-1', owner='dex')],
        version=0,
    )

    account.deposit('acc-1', 500)

    events = account.collect_events()
    assert len(events) == 1
    assert isinstance(events[0], MoneyDeposited)
    assert account.balance == 500

Tip

AggregateSpec is the recommended approach — it mirrors DeciderSpec and keeps tests concise. Use manual testing only when you need fine-grained control over the aggregate lifecycle.

Integration Testing

For integration tests, use InMemoryEventStore — no database needed. Combine it with waku.testing.create_test_app() to create minimal test applications.

from waku.cqrs import IMediator
from waku.testing import create_test_app

from app.commands import OpenAccountCommand
from app.modules import AppModule


async def test_full_flow() -> None:
    async with create_test_app(base=AppModule) as app:
        async with app.container() as container:
            mediator = await container.get(IMediator)
            result = await mediator.send(OpenAccountCommand(account_id='acc-1', owner='dex'))
            assert result.account_id == 'acc-1'

Waiting for Projections

When integration tests involve catch-up projections running in background tasks, use wait_for_projection() to block until a projection has processed all events. This avoids flaky timing-dependent assertions.

from waku.eventsourcing.projection.in_memory import InMemoryCheckpointStore
from waku.eventsourcing.projection.registry import CatchUpProjectionRegistry
from waku.eventsourcing.store.in_memory import InMemoryEventStore
from waku.eventsourcing.testing import wait_for_all_projections, wait_for_projection


async def test_wait_for_single_projection(
    event_store: InMemoryEventStore,
    checkpoint_store: InMemoryCheckpointStore,
) -> None:
    # ... append events via command handler ...

    await wait_for_projection(
        checkpoint_store=checkpoint_store,
        event_reader=event_store,
        projection_name='account_summary',
        deadline=5.0,
    )

    # Projection is caught up — assert read model state


async def test_wait_for_all(
    event_store: InMemoryEventStore,
    checkpoint_store: InMemoryCheckpointStore,
    projection_registry: CatchUpProjectionRegistry,
) -> None:
    # ... append events via command handler ...

    await wait_for_all_projections(
        checkpoint_store=checkpoint_store,
        event_reader=event_store,
        projection_registry=projection_registry,
        deadline=10.0,
    )

    # All projections caught up — assert read model state

wait_for_projection() polls the checkpoint store until the projection's checkpoint reaches the event store's global head position. If the projection does not catch up within the deadline, a TimeoutError is raised.

wait_for_all_projections() does the same for every binding in a CatchUpProjectionRegistry (default deadline=10.0).

wait_for_projection() parameters:

Parameter Default Description
checkpoint_store (required) ICheckpointStore to read checkpoints from
event_reader (required) IEventReader to determine the global head position
projection_name (required) Name of the projection to wait for
deadline 5.0 Maximum seconds to wait
poll_interval 0.1 Seconds between polls

Further reading

  • Testing — core waku testing utilities and provider overrides