DDD 踩坑 — Aggregate Boundary、Double-Entry Ledger 和 Event System

上一篇 講了 DDD 的概念跟 .NET MVC 的對比。這篇記錄實際在專案裡踩到的坑: aggregate boundary 怎麼畫、double-entry ledger 怎麼設計、event system 怎麼讓模組之間溝通

Aggregate Boundary 畫錯的代價

Aggregate 的規則聽起來很簡單 — 一組一定要一起存取的東西, 有一個 root entity 當入口。但實際畫 boundary 的時候, 很容易畫錯

Post 和 Comment

Post (貼文) 跟 Comment (留言) 是我第一個碰到的 aggregate 設計:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class Post:
    id: UUID
    author_id: UUID
    author_type: AuthorType
    body: str
    comments: list[Comment]
    hashtags: list[Hashtag]

    def add_comment(self, author_id: UUID, author_type: AuthorType, body: str) -> Comment:
        comment = Comment(
            id=uuid4(), post_id=self.id,
            author_id=author_id, author_type=author_type, body=body,
        )
        self.comments.append(comment)
        return comment

    def remove_comment(self, comment_id: UUID, author_id: UUID, author_type: AuthorType):
        comment = self._find_comment(comment_id)
        comment.verify_authorship(author_id, author_type)
        self.comments.remove(comment)

重點:

  • 外面不能直接操作 Comment, 一定要透過 Post
  • 新增留言: post.add_comment(), 不是 comment_repo.create()
  • 刪除留言: post.remove_comment(), 由 Comment 自己檢查 authorship, 但移除的動作由 Post 控制
  • 存取時整個 aggregate 一起 load / save

Comment 自己有一個 verify_authorship() 方法, 用來確認「你是不是這則留言的作者」。但「移除留言」這個動作只有 Post 能做 — Comment 不能自己把自己從 list 裡移掉

Wallet 的獨佔擁有權

Wallet (錢包) 有一個一開始沒想清楚的 invariant: 一個 wallet 一定屬於一個 user 或一個 brand, 不能兩者都有, 也不能都沒有

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Wallet:
    id: UUID
    user_id: UUID | None
    brand_id: UUID | None

    def __post_init__(self):
        has_user = self.user_id is not None
        has_brand = self.brand_id is not None
        if has_user == has_brand:
            raise ValueError("Wallet must belong to exactly one of user or brand")

    @classmethod
    def for_user(cls, user_id: UUID) -> Wallet:
        return cls(id=uuid4(), user_id=user_id, brand_id=None)

    @classmethod
    def for_brand(cls, brand_id: UUID) -> Wallet:
        return cls(id=uuid4(), user_id=None, brand_id=brand_id)

用 factory method (for_user(), for_brand()) 來建立, 不直接開放 constructor。這樣呼叫端不需要知道 “user_id 跟 brand_id 只能有一個” 這條規則, 從 API 層面就不可能建出不合法的 wallet

Brand 的狀態機

Brand 有三個狀態: PENDING → ACTIVE → ARCHIVED, 轉換規則全在 entity 裡:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class Brand:
    status: BrandStatus = BrandStatus.PENDING
    shopify: ShopifyCredentials

    def activate(self):
        if self.status == BrandStatus.ARCHIVED:
            raise ValidationError("Cannot activate archived brand")
        if not self.shopify.is_installed:
            raise ValidationError("Shopify must be installed before activation")
        self.status = BrandStatus.ACTIVE

    def archive(self):
        if self.status == BrandStatus.ARCHIVED:
            raise ValidationError("Brand is already archived")
        self.status = BrandStatus.ARCHIVED

    def reactivate(self):
        if self.status != BrandStatus.ARCHIVED:
            raise ValidationError("Only archived brands can be reactivated")
        self.status = BrandStatus.PENDING

一開始我把 activate() 的前置檢查放在 Service 裡。後來發現有三個不同的地方都會呼叫 activate, 其中一個忘了檢查 Shopify 是否已安裝。搬到 entity 之後, 不管從哪裡呼叫都會檢查

另一個細節: ShopifyCredentials 是一個 frozen value object:

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class ShopifyCredentials:
    shop_domain: str | None = None
    access_token: str | None = None
    oauth_nonce: str | None = None

    @property
    def is_installed(self) -> bool:
        return bool(self.access_token)

它同時包含 OAuth 設定階段的欄位 (oauth_nonce) 和安裝完成後的欄位 (access_token), 用 is_installed property 來判斷狀態。frozen 確保不會有人直接改 access_token 繞過安裝流程

Double-Entry Ledger

專案有錢包和佣金系統, 需要記帳。一開始考慮過直接在 wallet 上加一個 balance 欄位, 每次交易就 += amount。但這有幾個問題:

  1. 並發問題 — 兩筆交易同時 += amount 可能覆蓋
  2. 沒有 audit trail — 餘額是怎麼來的? 無從追溯
  3. 退款時要手動算回去, 容易算錯

所以用了 double-entry (複式記帳): 每筆交易都有 debit (借方) 和 credit (貸方), 餘額不直接存在 wallet 上, 而是從交易紀錄算出來

Transaction Entity

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class Transaction:
    id: UUID
    debit_wallet_id: UUID
    credit_wallet_id: UUID
    money: Money
    status: TransactionStatus = TransactionStatus.PENDING
    source: SourceReference | None = None

    def __post_init__(self):
        if self.debit_wallet_id == self.credit_wallet_id:
            raise ValueError("Cannot transfer to same wallet")

    def void(self):
        if self.status != TransactionStatus.PENDING:
            raise ValueError("Only pending transactions can be voided")
        self.status = TransactionStatus.VOIDED

重點:

  • debit_wallet_idcredit_wallet_id: 不能自己轉給自己
  • Money 是 value object, 包含金額和幣別
  • 交易建立後只能做一件事: void(), 而且只有 PENDING 狀態才能 void
  • SourceReference 用來追蹤這筆交易是從哪裡來的 (例如 Shopify 訂單)

Money Value Object

1
2
3
4
5
6
7
8
@dataclass(frozen=True)
class Money:
    amount_cents: int
    currency: str = "USD"

    def __post_init__(self):
        if self.amount_cents < 0:
            raise ValueError("Amount cannot be negative")

用 cents (分) 而不是 dollars, 避免浮點數精度問題。金額不能為負, frozen 確保不可變

餘額是算出來的, 不是存的

這是 double-entry 最核心的概念。Wallet 自己沒有 balance 欄位:

1
2
3
4
5
class WalletRepository(Protocol):
    def get_by_user(self, user_id: UUID) -> Wallet | None: ...
    def get_by_brand(self, brand_id: UUID) -> Wallet | None: ...
    def create_for_user(self, user_id: UUID) -> Wallet: ...
    def create_for_brand(self, brand_id: UUID) -> Wallet: ...

餘額在 repository 的 infrastructure 層用 SQL 算:

1
balance = sum(credits where wallet is credit) - sum(debits where wallet is debit)

只計算非 VOIDED 的交易。這樣:

  • 任何時間點的餘額都可以從交易紀錄重算出來
  • void 一筆交易不需要手動調整餘額, 因為 voided 的交易自動被排除
  • 不存在 “餘額跟交易紀錄對不上” 的問題

冪等性

Shopify 訂單可能因為 webhook retry 被處理多次。Transaction repository 用 PostgreSQL 的 INSERT ... ON CONFLICT DO NOTHING 搭配 unique constraint:

1
2
UNIQUE (debit_wallet_id, credit_wallet_id, source_type, source_id)
  WHERE source_type IS NOT NULL AND source_id IS NOT NULL

同一筆 Shopify 訂單不管被送來幾次, 只會產生一筆交易。如果已經存在, 就回傳現有的那筆, 不是報錯

DB 層也有對應的 check constraint:

  • CHECK (amount_cents >= 0)
  • CHECK (debit_wallet_id != credit_wallet_id)

Domain layer 跟 DB layer 都做了驗證。domain 層的檢查是為了在程式邏輯層面就擋住, DB 層的 constraint 是最後防線

Attribution: 實際的佣金記帳流程

AttributionService 負責從 Shopify 訂單計算佣金, 記錄到 ledger:

  1. 從 Shopify API 拿到訂單
  2. 算出使用者佣金 (例如 15%)
  3. 記一筆 brand_wallet → user_wallet 的交易
  4. 算出平台費 (例如 0.5%)
  5. 記一筆 brand_wallet → platform_wallet 的交易

每筆交易都帶 SourceReference(source_type="shopify_order", source_id=order_id), 所以重複處理不會重複記帳

cursor 機制用最後一筆交易的時間戳減 7 天當起點, 確保不會漏掉延遲到達的訂單, 同時靠冪等性避免重複

Event System: 模組之間怎麼溝通

DDD 的 Bounded Context 之間不能直接 import。但模組之間一定會有需要通知的時候 — 例如 brand 建立後, ledger 要自動建一個 wallet。怎麼做?

設計

Foundation 提供了一個 event 基礎設施:

Base Entity — 所有 domain entity 都有 event 記錄能力:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class Entity:
    _events: list

    def _record(self, event: object):
        self._events.append(event)

    def collect_events(self) -> list:
        events = list(self._events)
        self._events.clear()
        return events

collect_events() 是 clear-on-read — 呼叫一次就清空, 確保 event 不會被重複處理

EventPublisher Protocol — 定義發布 event 的 interface:

1
2
class EventPublisher(Protocol):
    def publish(self, event: object) -> None: ...

EventCollector — request scope 的 event 收集器, 在一個 request 裡面收集所有 event, 等 request 結束再一起 dispatch

EventBus — 多播, 把 event 分發給對應的 handler:

1
2
3
4
5
6
7
8
class EventBus:
    def subscribe(self, event_type: type, handler: Callable):
        self._handlers[event_type].append(handler)

    def dispatch(self, events: list):
        for event in events:
            for handler in self._handlers.get(type(event), []):
                handler(event)

實際的 Domain Events

每個 module 定義自己的 event:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@dataclass(frozen=True)
class BrandCreated:
    brand_id: UUID

@dataclass(frozen=True)
class TransactionRecorded:
    debit_wallet_id: UUID
    credit_wallet_id: UUID
    transaction_id: UUID

@dataclass(frozen=True)
class UserCreated:
    user_id: UUID

Event 都是 frozen value object — 不可變, 只攜帶 ID, 不攜帶整個 entity。這樣接收方必須自己去查最新狀態, 不會拿到過時的 snapshot

發布流程

Application Service 裡, EventPublisher 是 optional dependency:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class BrandService:
    def __init__(self, repo: BrandRepository, collector: EventPublisher | None = None):
        self._repo = repo
        self._collector = collector

    def create(self, ...) -> Brand:
        brand = Brand(...)
        self._repo.create(brand)
        if self._collector:
            self._collector.publish(BrandCreated(brand_id=brand.id))
        return brand

為什麼是 optional? 因為測試時不一定需要 event system, 注入 None 就可以只測 business logic

Presentation layer (providers.py) 負責把 EventCollector 注入到 Service:

1
2
def get_brand_service(scope: RequestScope = Depends(...)) -> BrandService:
    return BrandService(repo=..., collector=scope.collector)

RequestScope 在每個 HTTP request 開始時建立一個 EventCollector, request 結束後 dispatch 所有收集到的 event

為什麼不直接 import

如果 BrandService 直接 import WalletService 來建 wallet:

1
2
3
4
5
class BrandService:
    def create(self, ...):
        brand = Brand(...)
        self._repo.create(brand)
        self._wallet_service.create_for_brand(brand.id)  # 直接耦合

這樣 Brands module 就依賴 Ledger module。之後如果要拆微服務, 或是 Ledger module 改了 interface, Brands 就要跟著改

用 event 的話:

  • Brands module 只管發 BrandCreated event
  • Ledger module 訂閱 BrandCreated, 自己決定要不要建 wallet
  • 兩個 module 完全不知道彼此的存在
  • 中間只透過 event 的 data class 溝通, 而 event 是 foundation 層的東西

整理

問題 踩坑 解法
Aggregate boundary 邏輯散落在 Service, entity 沒保護好自己 entity 帶行為, 外部只能透過 root 操作
狀態機 多處呼叫 activate, 有的忘記檢查前置條件 轉換邏輯寫在 entity method 裡
餘額計算 直接存 balance 欄位, 並發和 audit 問題 double-entry, 餘額從交易紀錄算出
重複交易 Webhook retry 產生重複佣金 unique constraint + INSERT ON CONFLICT
模組耦合 module A 直接 import module B domain event, 只透過 event 通知

下一篇記錄 GCP runtime contract、auth bypass 設計和 CI/CD auto-deploy 的做法

References

0%