Skip to content

Transactions

TransactionalBehavior is a pipeline behavior that wraps message handling in a unit-of-work commit/rollback cycle. It commits on success and rolls back on any failure — including failures during the commit itself.


IUnitOfWork Protocol

IUnitOfWork is a two-method protocol that lives at the top level (waku.uow), not inside messaging — it's a general infrastructure concern usable by any layer.

from waku.uow import IUnitOfWork

class IUnitOfWork(Protocol):
    async def commit(self) -> None: ...
    async def rollback(self) -> None: ...

The protocol is defined by waku — you only need to provide an implementation. Any class that satisfies it can serve as the unit of work for TransactionalBehavior.


SQLAlchemy Adapter

SqlAlchemyUnitOfWork wraps an AsyncSession and delegates commit() / rollback() to it:

from waku.messaging.sqla.uow import SqlAlchemyUnitOfWork

Register it in your infrastructure module, mapping the implementation to IUnitOfWork:

from waku import module
from waku.di import scoped
from waku.uow import IUnitOfWork
from waku.messaging.sqla.uow import SqlAlchemyUnitOfWork


@module(
    providers=[
        scoped(IUnitOfWork, SqlAlchemyUnitOfWork),
    ],
)
class InfraModule: ...

SqlAlchemyUnitOfWork receives the AsyncSession via dependency injection, so make sure you have a session provider registered in one of your modules.


TransactionalBehavior

TransactionalBehavior follows a strict commit/rollback sequence:

  1. Call call_next() (the handler, plus any remaining behaviors).
  2. On success: uow.commit().
  3. On handler exception: uow.rollback(), re-raise.
  4. On commit exception: uow.rollback(), re-raise.

Register it as a global pipeline behavior:

1
2
3
4
5
6
7
8
from waku.messaging import MessagingConfig, MessagingModule
from waku.messaging.behaviors.transactional import TransactionalBehavior

MessagingModule.register(
    MessagingConfig(
        pipeline_behaviors=[TransactionalBehavior],
    ),
)

Warning

When registered globally, TransactionalBehavior applies to every message flowing through the bus — including read-only queries. This means every query opens and commits a transaction, even when there are no writes.

Tip

Use per-request behaviors if you only want transactions on write commands:

from waku import module
from waku.messaging import MessagingExtension
from waku.messaging.behaviors.transactional import TransactionalBehavior


@module(
    extensions=[
        MessagingExtension().bind_request(
            CreateOrderCommand,
            CreateOrderCommandHandler,
            behaviors=[TransactionalBehavior],
        ),
    ],
)
class OrderModule: ...

Wiring Example

A complete setup with SQLAlchemy session, unit of work, and TransactionalBehavior:

from waku import module
from waku.di import scoped
from waku.uow import IUnitOfWork
from waku.messaging import MessagingConfig, MessagingModule
from waku.messaging.sqla.uow import SqlAlchemyUnitOfWork
from waku.messaging.behaviors.transactional import TransactionalBehavior


@module(
    providers=[
        scoped(IUnitOfWork, SqlAlchemyUnitOfWork),
    ],
)
class InfraModule: ...


@module(
    imports=[
        InfraModule,
        MessagingModule.register(
            MessagingConfig(
                pipeline_behaviors=[TransactionalBehavior],
            ),
        ),
    ],
)
class AppModule: ...

Custom UoW

Implement IUnitOfWork for any backend — the protocol requires only commit and rollback:

from waku.uow import IUnitOfWork


class MongoUnitOfWork(IUnitOfWork):
    def __init__(self, session: AsyncIOMotorClientSession) -> None:
        self._session = session

    async def commit(self) -> None:
        await self._session.commit_transaction()

    async def rollback(self) -> None:
        await self._session.abort_transaction()

Register it the same way as the SQLAlchemy adapter:

scoped(IUnitOfWork, MongoUnitOfWork)

Further reading