Domain-Centric Pitfalls — Consistency-Cluster Boundaries, Double-Entry Ledger, and the Notification Pipeline

The previous post covered the concepts of DDD and how they compare to .NET MVC. This post documents the pitfalls I encountered in the actual project: how to draw consistency-cluster boundaries, how to design a double-entry ledger, and how the notification pipeline enables cross-module communication.

The Cost of Drawing Consistency-Cluster Boundaries Wrong

The rules for an Aggregate sound straightforward — a group of things that must always be accessed together, with a root entity as the entry point. But actually drawing the boundary is easy to get wrong.

Article and Its Replies

Post and Comment were the first consistency-cluster design I tackled:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class Post:
    id: UUID
    author_id: UUID
    author_type: AuthorType
    body: str
    comments: list[Comment]
    hashtags: list[Hashtag]

    def add_comment(self, author_id: UUID, author_type: AuthorType, body: str) -> Comment:
        comment = Comment(
            id=uuid4(), post_id=self.id,
            author_id=author_id, author_type=author_type, body=body,
        )
        self.comments.append(comment)
        return comment

    def remove_comment(self, comment_id: UUID, author_id: UUID, author_type: AuthorType):
        comment = self._find_comment(comment_id)
        comment.verify_authorship(author_id, author_type)
        self.comments.remove(comment)

Key points:

  • Outside code cannot operate on Comment directly — it must go through Post
  • Adding a comment: post.add_comment(), not comment_repo.create()
  • Removing a comment: post.remove_comment(), where Comment itself checks authorship, but the removal action is controlled by Post
  • The entire cluster is loaded and saved together

Comment has its own verify_authorship() method to confirm “are you the author of this comment?” But the “remove comment” action can only be performed by PostComment cannot remove itself from the list.

Exclusive Ownership of a Purse

Wallet has an invariant I did not think through at first: a wallet must belong to either a user or a brand — not both, and not neither.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Wallet:
    id: UUID
    user_id: UUID | None
    brand_id: UUID | None

    def __post_init__(self):
        has_user = self.user_id is not None
        has_brand = self.brand_id is not None
        if has_user == has_brand:
            raise ValueError("Wallet must belong to exactly one of user or brand")

    @classmethod
    def for_user(cls, user_id: UUID) -> Wallet:
        return cls(id=uuid4(), user_id=user_id, brand_id=None)

    @classmethod
    def for_brand(cls, brand_id: UUID) -> Wallet:
        return cls(id=uuid4(), user_id=None, brand_id=brand_id)

Using factory methods (for_user(), for_brand()) instead of exposing the constructor directly means the caller does not need to know the rule “user_id and brand_id are mutually exclusive” — the API itself makes it impossible to create an invalid wallet.

The Vendor State Machine

Brand has three states: PENDING → ACTIVE → ARCHIVED. All transition rules live inside the entity:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class Brand:
    status: BrandStatus = BrandStatus.PENDING
    shopify: ShopifyCredentials

    def activate(self):
        if self.status == BrandStatus.ARCHIVED:
            raise ValidationError("Cannot activate archived brand")
        if not self.shopify.is_installed:
            raise ValidationError("Shopify must be installed before activation")
        self.status = BrandStatus.ACTIVE

    def archive(self):
        if self.status == BrandStatus.ARCHIVED:
            raise ValidationError("Brand is already archived")
        self.status = BrandStatus.ARCHIVED

    def reactivate(self):
        if self.status != BrandStatus.ARCHIVED:
            raise ValidationError("Only archived brands can be reactivated")
        self.status = BrandStatus.PENDING

At first I put the pre-checks for activate() inside the Service. Later I realized three different places were calling activate, and one of them forgot to check whether Shopify was installed. After moving the check into the entity, every call site is covered no matter what.

Another detail: ShopifyCredentials is a frozen value object:

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class ShopifyCredentials:
    shop_domain: str | None = None
    access_token: str | None = None
    oauth_nonce: str | None = None

    @property
    def is_installed(self) -> bool:
        return bool(self.access_token)

It holds both OAuth-setup-phase fields (oauth_nonce) and post-installation fields (access_token), using the is_installed property to determine the state. Being frozen ensures nobody can directly modify access_token and bypass the installation flow.

Double-Entry Ledger

The project has a wallet and commission system that requires bookkeeping. At first I considered adding a balance field directly on the wallet and doing += amount for each transaction. But this has several problems:

  1. Concurrency — two transactions doing += amount at the same time can overwrite each other
  2. No audit trail — how was the balance derived? There is no way to trace it back
  3. Refunds require manual back-calculation, which is error-prone

So we used double-entry bookkeeping: each transaction has a debit side and a credit side. The balance is not stored directly on the wallet — it is computed from the transaction records.

Transaction Entity

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class Transaction:
    id: UUID
    debit_wallet_id: UUID
    credit_wallet_id: UUID
    money: Money
    status: TransactionStatus = TransactionStatus.PENDING
    source: SourceReference | None = None

    def __post_init__(self):
        if self.debit_wallet_id == self.credit_wallet_id:
            raise ValueError("Cannot transfer to same wallet")

    def void(self):
        if self.status != TransactionStatus.PENDING:
            raise ValueError("Only pending transactions can be voided")
        self.status = TransactionStatus.VOIDED

Key points:

  • debit_wallet_idcredit_wallet_id: you cannot transfer to yourself
  • Money is a value object containing amount and currency
  • After creation, the only operation on a transaction is void(), and only PENDING transactions can be voided
  • SourceReference tracks where this transaction came from (e.g. a Shopify order)

Money Value Object

1
2
3
4
5
6
7
8
@dataclass(frozen=True)
class Money:
    amount_cents: int
    currency: str = "USD"

    def __post_init__(self):
        if self.amount_cents < 0:
            raise ValueError("Amount cannot be negative")

Using cents instead of dollars avoids floating-point precision issues. The amount cannot be negative, and being frozen ensures immutability.

Balance Is Computed, Not Stored

This is the core concept of double-entry bookkeeping. The Wallet itself has no balance field:

1
2
3
4
5
class WalletRepository(Protocol):
    def get_by_user(self, user_id: UUID) -> Wallet | None: ...
    def get_by_brand(self, brand_id: UUID) -> Wallet | None: ...
    def create_for_user(self, user_id: UUID) -> Wallet: ...
    def create_for_brand(self, brand_id: UUID) -> Wallet: ...

The balance is computed at the repository’s infrastructure layer using SQL:

1
balance = sum(credits where wallet is credit) - sum(debits where wallet is debit)

Only non-VOIDED transactions are counted. This means:

  • The balance at any point in time can be recomputed from the transaction records
  • Voiding a transaction does not require manually adjusting the balance, because voided transactions are automatically excluded
  • The “balance does not match transaction records” problem cannot exist

Idempotency

Shopify orders may be processed multiple times due to webhook retries. The Transaction repository uses PostgreSQL’s INSERT ... ON CONFLICT DO NOTHING with a unique constraint:

1
2
UNIQUE (debit_wallet_id, credit_wallet_id, source_type, source_id)
  WHERE source_type IS NOT NULL AND source_id IS NOT NULL

The same Shopify order will produce only one transaction no matter how many times it is delivered. If the record already exists, the existing one is returned instead of raising an error.

The database layer also has corresponding check constraints:

  • CHECK (amount_cents >= 0)
  • CHECK (debit_wallet_id != credit_wallet_id)

Both the domain layer and the database layer perform validation. The domain-level checks catch issues at the application-logic level; the database constraints are the last line of defense.

Commission Calculation: The Actual Bookkeeping Flow

AttributionService is responsible for calculating commission from Shopify orders and recording it in the ledger:

  1. Fetch the order from the Shopify API
  2. Calculate the user commission (e.g. 15%)
  3. Record a brand_wallet → user_wallet transaction
  4. Calculate the platform fee (e.g. 0.5%)
  5. Record a brand_wallet → platform_wallet transaction

Each transaction carries SourceReference(source_type="shopify_order", source_id=order_id), so duplicate processing does not result in duplicate entries.

The cursor mechanism uses the timestamp of the last transaction minus 7 days as the starting point, ensuring late-arriving orders are not missed while idempotency prevents duplicates.

The Notification Pipeline: How Modules Communicate

Bounded Contexts in DDD must not import each other directly. But modules inevitably need to notify one another — for example, after a brand is created, the ledger module needs to automatically create a wallet. How?

Design

The foundation layer provides an event infrastructure:

Base Entity — every domain entity has the ability to record events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class Entity:
    _events: list

    def _record(self, event: object):
        self._events.append(event)

    def collect_events(self) -> list:
        events = list(self._events)
        self._events.clear()
        return events

collect_events() is clear-on-read — calling it once clears the list, ensuring events are not processed twice.

EventPublisher Protocol — defines the interface for publishing events:

1
2
class EventPublisher(Protocol):
    def publish(self, event: object) -> None: ...

EventCollector — a request-scoped event collector that gathers all events within a single request and dispatches them together after the request completes.

EventBus — a multicast dispatcher that routes events to the corresponding handlers:

1
2
3
4
5
6
7
8
class EventBus:
    def subscribe(self, event_type: type, handler: Callable):
        self._handlers[event_type].append(handler)

    def dispatch(self, events: list):
        for event in events:
            for handler in self._handlers.get(type(event), []):
                handler(event)

Concrete Notification Types

Each module defines its own events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@dataclass(frozen=True)
class BrandCreated:
    brand_id: UUID

@dataclass(frozen=True)
class TransactionRecorded:
    debit_wallet_id: UUID
    credit_wallet_id: UUID
    transaction_id: UUID

@dataclass(frozen=True)
class UserCreated:
    user_id: UUID

Events are frozen value objects — immutable, carrying only IDs, not entire entities. This forces the receiver to query the latest state itself, preventing stale snapshots.

Publishing Flow

In the Application Service, EventPublisher is an optional dependency:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class BrandService:
    def __init__(self, repo: BrandRepository, collector: EventPublisher | None = None):
        self._repo = repo
        self._collector = collector

    def create(self, ...) -> Brand:
        brand = Brand(...)
        self._repo.create(brand)
        if self._collector:
            self._collector.publish(BrandCreated(brand_id=brand.id))
        return brand

Why optional? Because tests do not always need the event system — injecting None lets you test business logic in isolation.

The presentation layer (providers.py) is responsible for injecting the EventCollector into the Service:

1
2
def get_brand_service(scope: RequestScope = Depends(...)) -> BrandService:
    return BrandService(repo=..., collector=scope.collector)

RequestScope creates a new EventCollector at the start of each HTTP request and dispatches all collected events after the request completes.

Why Not Wire Directly

If BrandService were to import WalletService directly to create a wallet:

1
2
3
4
5
class BrandService:
    def create(self, ...):
        brand = Brand(...)
        self._repo.create(brand)
        self._wallet_service.create_for_brand(brand.id)  # 直接耦合

Then the Brands module would depend on the Ledger module. If you later want to split into microservices, or if the Ledger module changes its interface, Brands has to change too.

With events:

  • The Brands module only cares about publishing the BrandCreated event
  • The Ledger module subscribes to BrandCreated and decides for itself whether to create a wallet
  • The two modules have zero knowledge of each other
  • They communicate only through the event data class, which lives in the foundation layer

Summary

Problem Pitfall Solution
Consistency-cluster boundary Logic scattered across Services, entity not protecting itself Entity carries behavior; outside code operates only through the root
State machine Multiple call sites for activate, some forgetting pre-condition checks Transition logic lives in entity methods
Balance calculation Storing a balance field directly causes concurrency and audit problems Double-entry: balance is computed from transaction records
Duplicate transactions Webhook retries produce duplicate commissions Unique constraint + INSERT ON CONFLICT
Module coupling Module A directly imports module B Domain events: notify only through events

The next post will document GCP runtime contracts, auth-bypass design, and CI/CD auto-deploy practices.

References