DDD, CQRS and Distributed Systems in Python
This article will be based on a sample application called Satoshi Spark 🚀 🔥
This is the Github repository for the full project: Satoshi Spark Source Code
Table of Contents
- Introduction
- Domain-Driven Design (DDD)
- Command Query Responsibility Segregation (CQRS)
- Distributed Systems
Introduction
Satoshi Spark is designed to facilitate crowdfunding campaigns via the Bitcoin Lightning Network.
This little project is a playground for exploring Domain-Driven Design (DDD), Command Query Responsibility Segregation (CQRS), and distributed systems concepts.
Domain-Driven Design (DDD)
The project follows DDD principles to ensure a clear and maintainable codebase.
Satoshi Spark is organized into several bounded contexts, each representing a distinct part of the domain:
- Common: Contains abstract and basic aggregate/ports/adapter definitions.
- Auth: Handles authentication and user management.
- Accounting: Manages account balances and transactions.
- Crowdfunding: Manages crowdfunding campaigns and donations.
- Bitcoin: Manages Bitcoin Lightning Network-related operations, such as invoices.
- Dashboard: Provides a read-only context where users can view their activities.
Each bounded context encapsulates its own domain logic and interacts with other contexts only through messages.
Aggregates: the building block of our domain
In Domain-Driven Design (DDD), aggregates define consistency boundaries in the write domain, ensuring that all changes within them maintain business rules and integrity. Each aggregate consists of closely related entities and has a root entity that controls modifications.
Repository operations only retrieve and persist aggregate roots.
class Aggregate(ABC):
def __init__(self, entity_id: str) -> None:
self.__entity_id = entity_id
@property
def entity_id(self) -> str:
return self.__entity_id
def __eq__(self, other) -> bool:
if not isinstance(other, Aggregate):
return False
return self.entity_id == other.entity_id
Abstract Repositories: avoid the ORM jail
Repositories are used to abstract the persistence layer from the domain. Here (in DDD-land), we don’t want to couple the domain with a specific database or ORM implementation.
Example: Coupling our domain with some specific ORM or DB implementation
import orm_library
class CampaignAggregate(orm_library.ORMClass):
entity_id = orm_library.ORMColumn(type=orm_library.types.String)
goal = orm_library.ORMColumn(type=orm_library.types.Integer)
...
Can you see the ‘problem’ with this code?
What if we want to change the ORM library? How can we write unit tests without being at the mercy of the ORM library?
The solution? Reverse the dependencies.
The domain will define repository interfaces (ports), and later we can define specific implementations (adapters) for the ORM library or database we choose.
Example: Abstract Repository
# See how our new aggregate is not coupled with any specific ORM or DB implementation
class Campaign(Aggregate):
def __init__(self, entity_id: str, goal: int, ...) -> None:
super().__init__(entity_id)
self.goal = goal
...
# Generic abstract repository class (no support for DB transactions yet, see next topic for it)
class Repository[T: Aggregate](ABC):
@abstractmethod
async def find_by_id(self, entity_id: str) -> T | None:
pass
@abstractmethod
async def add(self, entity: T) -> None:
pass
@abstractmethod
async def update(self, entity: T) -> None:
pass
class CampaignRepository(Repository[Campaign], ABC):
pass
I suspect you can already guess the overall idea on how to implement a specific repository for a Postgres database, right? 😉
If not, don’t worry! There are plenty of examples in the codebase.
You can also submit a PR to add more examples for different libraries or engines!
Unit of Work: How to handle database transactions in the domain model
If you’ve been paying closed attention, you’ll notice that we haven’t talked about transactions at all.
Creating database transactions is trivial, but how can we ensure the domain does not become polluted with database-specific code?
The answer is… Units of Work!
The Unit of Work pattern is used to manage transactions and ensure data consistency. It tracks changes to aggregates and commits them as a single transaction. This pattern represents an abstraction for any database (SQL or not) transaction, de-coupling the domain from particular database implementations.
Example: UOW in action
async def register_campaign_donation(
command: TransferSucceededEvent,
) -> None:
# Entering this context manager creates a uow and starts a transaction
async with uow_factory() as uow:
campaign_id: str = command.metadata["campaign_id"]
campaign: Campaign = await campaign_repository(uow).find_by_id(
campaign_id
)
# Notice how we don't have to explicitly call any persistence method on the aggregate,
# as the UOW will take care of that for us
campaign.donate(
Donation(
idempotency_key=command.idempotency_key,
amount=command.amount,
account_id=command.from_account_id,
)
)
# Leaving the context manager will:
# * persist the aggregate changes
# * store the messages in the transactional outbox
# * commit the transaction
Example: Abstract Unit of Work
# Abstract unit of work (UOW)
class UnitOfWork(ABC):
def __init__(self) -> None:
self._messages: list[Message] = [] # The domain emits event to the UOW
self._tracked_objects: list[tuple[object, Callable]] = [] # The UOW keeps track of aggregates to persist their changes
def emit(self, message: Message) -> None:
self._messages.append(message)
# Before commit, the UOW calls the persistence callback of each tracked object
# and stores the messages in a transactional outbox (more on this later)
async def commit(self) -> None:
for obj, persistence_callback in self._tracked_objects:
await persistence_callback()
await outbox(self).store(self._messages)
await self._commit()
async def rollback(self) -> None:
self._messages.clear()
self._tracked_objects.clear()
await self._rollback()
@abstractmethod
async def _commit(self) -> None:
pass
@abstractmethod
async def _rollback(self) -> None:
pass
Example: Abstract Repository implementation
# Generic abstract repository class
class Repository[T: Aggregate](ABC):
# We initialize the repository with a UOW
def __init__(self, uow: UnitOfWork) -> None:
self.__uow = uow
def __track_object(self, obj: T) -> None:
self.__uow.track_object(obj, lambda: self._update(obj))
# Every time we add or we find an object, we track it so they are persisted on-commit
async def add(self, entity: T) -> None:
await self._add(entity)
self.__track_object(entity)
async def find_by_id(self, entity_id: str) -> T | None:
obj = await self._find_by_id(entity_id)
if obj is not None:
self.__track_object(obj)
return obj
@abstractmethod
async def _find_by_id(self, entity_id: str) -> T | None:
pass
@abstractmethod
async def _add(self, entity: T) -> None:
pass
@abstractmethod
async def _update(self, entity: T) -> None:
pass
Example: Postgres Unit Of Work implementation
# Postgres specific implementation
class PostgresUnitOfWork(UnitOfWork):
def __init__(self, conn: asyncpg.Connection, transaction: Transaction) -> None:
super().__init__()
self.__conn = conn
self.__transaction = transaction
@property
def conn(self) -> asyncpg.Connection:
return self.__conn
async def _commit(self) -> None:
await self.__transaction.commit()
async def _rollback(self) -> None:
await self.__transaction.rollback()
# Postgres UOW context manager factory
@contextlib.asynccontextmanager
async def make_postgres_unit_of_work() -> AsyncGenerator[PostgresUnitOfWork, None]:
async with postgres_pool.get_pool().acquire() as conn:
transaction = conn.transaction(isolation="repeatable_read")
await transaction.start()
uow = PostgresUnitOfWork(
conn,
transaction,
)
try:
yield uow
await uow.commit()
except BaseException:
await uow.rollback()
raise
I hope this covers the previous commented part of the repository implementation 😅.
Command Query Responsibility Segregation (CQRS)
CQRS is used to separate the read and write operations of the application. Commands are used to change the state of the system, while queries are used to retrieve data.
Write model
Commands are written as imperative verbs (do something…), while events are written as past verbs (something happened…).
Example: Command and Event
# Crowdfunding context example command:
@dataclass(frozen=True)
class CreateCampaign(Command):
entity_id: str
account_id: str
title: str
description: str
goal: int
# Accounting context example event:
@dataclass(frozen=True)
class TransferSucceededEvent(Event):
idempotency_key: str
from_account_id: str
to_account_id: str
amount: int
metadata: dict
The write model consists of the aggregates, repositories, units of work, and messages used to change the state of the system. Here, consistency and integrity are the main concerns. Performance, although always relevant, is not the primary focus.
Example: Write model command handler
async def handle_register(
command: RegisterAccount,
) -> None:
async with make_unit_of_work() as uow:
account = Account(
account_id=command.account_id,
username=command.username.lower(),
password=command.hashed_password,
)
await account_repository(uow).add(account)
uow.emit(SignupEvent(account_id=account.account_id))
Read model
The read model consists of the queries and views used to retrieve data from the system. These operations do not affect the state of the system, and should be optimized for speed.
This model can implement caches, projections, and other tricks to improve performance. It might not always be 1 on 1 with the write model, but it should not matter.
Example: Read model query handler
@dataclass(frozen=True)
class DashboardView:
account_id: str
balance: int
campaigns: list[CampaignView]
...
async def view_dashboard_query(account_id: str) -> DashboardView:
return await dashboard_view_factory().create_dashboard_view(account_id=account_id)
class DashboardViewFactory(ABC):
@abstractmethod
async def create_dashboard_view(self, account_id: str) -> DashboardView:
pass
class PostgresDashboardViewFactory(DashboardViewFactory):
async def create_dashboard_view(self, account_id: str) -> DashboardView :
async with postgres_pool.get_pool().acquire() as conn:
row = await conn.fetchrow("SQL Query...")
assert row
return DashboardView(**row)
Distributed Systems
Even though this application is a monolith, we don’t do direct communication between contexts. Instead, we use messages to communicate between them.
This comes with some considerations:
- How can we ensure that messages are delivered reliably? 💀
- How can we ensure that messages are not lost? 💀
- How can we handle inter-context database transactions? 💀
If only someone could come up with a solution for these problems… 🤔
Transactional Outboxes: reliability send messages
Transactional Outboxes ensure that messages are reliably sent even if the system crashes. They store messages in a database table and process them asynchronously.
Consider this scenario:
- We start a database transaction
- We do changes to the domain write model
- We commit the database transaction
- We emit messages to a messaging engine (think of kafka or whatever)
What happens if our application crashes after step 3? The transaction is saved, but the messages are lost. Important business processes might not be triggered, and the system might end up in an inconsistent state.
Now the following:
- We start a database transaction
- We do changes to the domain write model
- We emit messages to a different messaging engine (think of kafka or whatever)
- We commit the database transaction
What happens if our application crashes after step 3? The messages are sent, but the transaction was rolled back. This is also very bad, imagine that we send a ‘payment processed event’, but the db never saved it.
A solution for this is to store the messages in a database table, in the same transaction as the domain changes. Then, after commit, a processor periodically polls this table and sends the messages to the messaging engine. After sending the messages, they can be deleted.
- We start a database transaction (i.e. postgres)
- We do changes to the domain write model
- We store the events in a postgres database table
- We commit the database transaction
- After transaction, we periodically poll the table and send the messages to the messaging engine
- After dispatch, we delete the messages
Now, if our application crashes after step 3, there should be no issues! Either the transaction commits and the messages are sent, or the transaction rolls back and the messages are never sent.
What happens if the processor crashes after step 5? Between sending the message and deleting it. As you can imagine, the messages will be sent twice. This is called ‘at least once’ delivery and is what most messaging engines implement. This is why it is important to make sure that the message handlers are idempotent.
Example: Transactional Outbox and Outbox processor
# This class stores the messages in the database when the uow commits
class TransactionalOutbox(ABC):
def __init__(self, uow: UnitOfWork) -> None:
self.__uow = uow
@abstractmethod
async def store(self, messages: list[Message]) -> None:
pass
# This class is supposed to run separately, in a periodic task or even a different process.
# This will iterate over the messages
class TransactionalOutboxProcessor(ABC):
async def process_messages(self) -> None:
messages = await self._fetch_messages()
await self._dispatch_messages(messages)
await self._destroy_messages(messages)
@abstractmethod
async def _fetch_messages(self) -> list[Message]:
pass
@abstractmethod
async def _dispatch_messages(self, messages: list[Message]) -> None:
pass
@abstractmethod
async def _destroy_messages(self, messages: list[Message]) -> None:
pass
Example: Postgres implementation of a transactional outbox
class PostgresTransactionalOutbox(TransactionalOutbox):
def __init__(self, uow: PostgresUnitOfWork) -> None:
super().__init__(uow)
self.uow = uow
async def store(self, messages: list[Message]) -> None:
if not messages:
return
await self.uow.conn.executemany("sql query...")
class PostgresTransactionalOutboxProcessor(TransactionalOutboxProcessor):
async def _fetch_messages(self) -> list[Message]:
async with postgres_pool.get_pool().acquire() as conn:
rows = await conn.fetch("sql query...")
return [self.__row_to_message(row) for row in rows]
async def _dispatch_messages(self, messages: list[Message]) -> None:
results = await asyncio.gather(
*[event_bus.handle(message) for message in messages],
return_exceptions=True,
)
exceptions = [result for result in results if isinstance(result, Exception)]
if exceptions:
for exc in exceptions:
logger.error(f"Error processing message: {exc}")
async def _destroy_messages(self, messages: list[Message]) -> None:
async with postgres_pool.get_pool().acquire() as conn:
await conn.execute("sql query...")
def __row_to_message(self, row: dict) -> Message:
...
Choreography Based Sagas: distributed transactions across contexts
Here, we’ll see how to handle distributed transactions across contexts using a choreography-based saga.
Basically, instead of having one ACID transaction, we have multiple smaller transactions that are coordinated by messages.
We’ll use this example: A user requests a withdraw to a bitcoin wallet.
Example: Bitcoin withdraw saga, happy path
-
First transaction, bitcoin context (saga starting point):
- User requests a withdrawal
- Bitcoin context creates an invoice with a pending status
- Bitcoin context emits a WithdrawRequestedEvent (The transactional outbox ensures this event is sent)
-
Second transaction, accounting context:
- The system verifies the user’s balance
- The system accepts the withdrawal
- The system updates the account’s balance and stores the financial operation
- The system emits a WithdrawalAcceptedEvent (The transactional outbox ensures this event is sent)
-
Third transaction, bitcoin context:
- The bitcoin processor pays for the invoice
- The bitcoin context updates the invoice status to paid
Example: Bitcoin withdraw saga, insufficient balance
-
First transaction, bitcoin context (saga starting point):
- User requests a withdrawal for X amount above their limit (this can happen if the view model is outdated)
- Bitcoin context creates an invoice with a pending status
- Bitcoin context emits a WithdrawRequestedEvent (The transactional outbox ensures this event is sent)
-
Second transaction, accounting context:
- The system verifies the user’s balance
- The system rejects the withdrawal
- The system emits a WithdrawalRejectedEvent (The transactional outbox ensures this event is sent)
-
Third transaction, bitcoin context:
- The bitcoin invoice is rejected, or deleted
Idempotency is crucial in all steps, as the system might crash at any point and messages should be able to be processed multiple times.
If the system crashes in any step, the saga should be retried until it completes successfully.
You can see how, even thought there is not one big transaction, the system is still consistent.
This concept is called Eventual Consistency
From monolith to microservices
The nice thing about this architecture is that it is very easy to split into microservices, as the bounded contexts are independent.
We can:
- Create one service, repository, ci/cd flow, etc… per bounded context.
- Move from a single database per project to a multiple database/schema per service pattern.
- Replace the in-process message bus with a real message broker (like Kafka or RabbitMQ)
- Deploy the applications separately.
But of course, there are many more considerations when moving to a microservice architecture.
I’ll leave this migration as an exercise to the reader 😳.