上一篇 講了 DDD 的概念跟 .NET MVC 的對比。這篇記錄實際在專案裡踩到的坑: aggregate boundary 怎麼畫、double-entry ledger 怎麼設計、event system 怎麼讓模組之間溝通
Aggregate Boundary 畫錯的代價
Aggregate 的規則聽起來很簡單 — 一組一定要一起存取的東西, 有一個 root entity 當入口。但實際畫 boundary 的時候, 很容易畫錯
Post (貼文) 跟 Comment (留言) 是我第一個碰到的 aggregate 設計:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Post :
id : UUID
author_id : UUID
author_type : AuthorType
body : str
comments : list [ Comment ]
hashtags : list [ Hashtag ]
def add_comment ( self , author_id : UUID , author_type : AuthorType , body : str ) -> Comment :
comment = Comment (
id = uuid4 (), post_id = self . id ,
author_id = author_id , author_type = author_type , body = body ,
)
self . comments . append ( comment )
return comment
def remove_comment ( self , comment_id : UUID , author_id : UUID , author_type : AuthorType ):
comment = self . _find_comment ( comment_id )
comment . verify_authorship ( author_id , author_type )
self . comments . remove ( comment )
重點:
外面不能直接操作 Comment, 一定要透過 Post
新增留言: post.add_comment(), 不是 comment_repo.create()
刪除留言: post.remove_comment(), 由 Comment 自己檢查 authorship, 但移除的動作由 Post 控制
存取時整個 aggregate 一起 load / save
Comment 自己有一個 verify_authorship() 方法, 用來確認「你是不是這則留言的作者」。但「移除留言」這個動作只有 Post 能做 — Comment 不能自己把自己從 list 裡移掉
Wallet 的獨佔擁有權
Wallet (錢包) 有一個一開始沒想清楚的 invariant: 一個 wallet 一定屬於一個 user 或一個 brand, 不能兩者都有, 也不能都沒有
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Wallet :
id : UUID
user_id : UUID | None
brand_id : UUID | None
def __post_init__ ( self ):
has_user = self . user_id is not None
has_brand = self . brand_id is not None
if has_user == has_brand :
raise ValueError ( "Wallet must belong to exactly one of user or brand" )
@classmethod
def for_user ( cls , user_id : UUID ) -> Wallet :
return cls ( id = uuid4 (), user_id = user_id , brand_id = None )
@classmethod
def for_brand ( cls , brand_id : UUID ) -> Wallet :
return cls ( id = uuid4 (), user_id = None , brand_id = brand_id )
用 factory method (for_user(), for_brand()) 來建立, 不直接開放 constructor。這樣呼叫端不需要知道 “user_id 跟 brand_id 只能有一個” 這條規則, 從 API 層面就不可能建出不合法的 wallet
Brand 的狀態機
Brand 有三個狀態: PENDING → ACTIVE → ARCHIVED, 轉換規則全在 entity 裡:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Brand :
status : BrandStatus = BrandStatus . PENDING
shopify : ShopifyCredentials
def activate ( self ):
if self . status == BrandStatus . ARCHIVED :
raise ValidationError ( "Cannot activate archived brand" )
if not self . shopify . is_installed :
raise ValidationError ( "Shopify must be installed before activation" )
self . status = BrandStatus . ACTIVE
def archive ( self ):
if self . status == BrandStatus . ARCHIVED :
raise ValidationError ( "Brand is already archived" )
self . status = BrandStatus . ARCHIVED
def reactivate ( self ):
if self . status != BrandStatus . ARCHIVED :
raise ValidationError ( "Only archived brands can be reactivated" )
self . status = BrandStatus . PENDING
一開始我把 activate() 的前置檢查放在 Service 裡。後來發現有三個不同的地方都會呼叫 activate, 其中一個忘了檢查 Shopify 是否已安裝。搬到 entity 之後, 不管從哪裡呼叫都會檢查
另一個細節: ShopifyCredentials 是一個 frozen value object:
1
2
3
4
5
6
7
8
9
@dataclass ( frozen = True )
class ShopifyCredentials :
shop_domain : str | None = None
access_token : str | None = None
oauth_nonce : str | None = None
@property
def is_installed ( self ) -> bool :
return bool ( self . access_token )
它同時包含 OAuth 設定階段的欄位 (oauth_nonce) 和安裝完成後的欄位 (access_token), 用 is_installed property 來判斷狀態。frozen 確保不會有人直接改 access_token 繞過安裝流程
Double-Entry Ledger
專案有錢包和佣金系統, 需要記帳。一開始考慮過直接在 wallet 上加一個 balance 欄位, 每次交易就 += amount。但這有幾個問題:
並發問題 — 兩筆交易同時 += amount 可能覆蓋
沒有 audit trail — 餘額是怎麼來的? 無從追溯
退款時要手動算回去, 容易算錯
所以用了 double-entry (複式記帳): 每筆交易都有 debit (借方) 和 credit (貸方), 餘額不直接存在 wallet 上, 而是從交易紀錄算出來
Transaction Entity
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Transaction :
id : UUID
debit_wallet_id : UUID
credit_wallet_id : UUID
money : Money
status : TransactionStatus = TransactionStatus . PENDING
source : SourceReference | None = None
def __post_init__ ( self ):
if self . debit_wallet_id == self . credit_wallet_id :
raise ValueError ( "Cannot transfer to same wallet" )
def void ( self ):
if self . status != TransactionStatus . PENDING :
raise ValueError ( "Only pending transactions can be voided" )
self . status = TransactionStatus . VOIDED
重點:
debit_wallet_id ≠ credit_wallet_id: 不能自己轉給自己
Money 是 value object, 包含金額和幣別
交易建立後只能做一件事: void(), 而且只有 PENDING 狀態才能 void
SourceReference 用來追蹤這筆交易是從哪裡來的 (例如 Shopify 訂單)
Money Value Object
1
2
3
4
5
6
7
8
@dataclass ( frozen = True )
class Money :
amount_cents : int
currency : str = "USD"
def __post_init__ ( self ):
if self . amount_cents < 0 :
raise ValueError ( "Amount cannot be negative" )
用 cents (分) 而不是 dollars, 避免浮點數精度問題。金額不能為負, frozen 確保不可變
餘額是算出來的, 不是存的
這是 double-entry 最核心的概念。Wallet 自己沒有 balance 欄位:
1
2
3
4
5
class WalletRepository ( Protocol ):
def get_by_user ( self , user_id : UUID ) -> Wallet | None : ...
def get_by_brand ( self , brand_id : UUID ) -> Wallet | None : ...
def create_for_user ( self , user_id : UUID ) -> Wallet : ...
def create_for_brand ( self , brand_id : UUID ) -> Wallet : ...
餘額在 repository 的 infrastructure 層用 SQL 算:
1
balance = sum(credits where wallet is credit) - sum(debits where wallet is debit)
只計算非 VOIDED 的交易。這樣:
任何時間點的餘額都可以從交易紀錄重算出來
void 一筆交易不需要手動調整餘額, 因為 voided 的交易自動被排除
不存在 “餘額跟交易紀錄對不上” 的問題
冪等性
Shopify 訂單可能因為 webhook retry 被處理多次。Transaction repository 用 PostgreSQL 的 INSERT ... ON CONFLICT DO NOTHING 搭配 unique constraint:
1
2
UNIQUE (debit_wallet_id, credit_wallet_id, source_type, source_id)
WHERE source_type IS NOT NULL AND source_id IS NOT NULL
同一筆 Shopify 訂單不管被送來幾次, 只會產生一筆交易。如果已經存在, 就回傳現有的那筆, 不是報錯
DB 層也有對應的 check constraint:
CHECK (amount_cents >= 0)
CHECK (debit_wallet_id != credit_wallet_id)
Domain layer 跟 DB layer 都做了驗證。domain 層的檢查是為了在程式邏輯層面就擋住, DB 層的 constraint 是最後防線
Attribution: 實際的佣金記帳流程
AttributionService 負責從 Shopify 訂單計算佣金, 記錄到 ledger:
從 Shopify API 拿到訂單
算出使用者佣金 (例如 15%)
記一筆 brand_wallet → user_wallet 的交易
算出平台費 (例如 0.5%)
記一筆 brand_wallet → platform_wallet 的交易
每筆交易都帶 SourceReference(source_type="shopify_order", source_id=order_id), 所以重複處理不會重複記帳
cursor 機制用最後一筆交易的時間戳減 7 天當起點, 確保不會漏掉延遲到達的訂單, 同時靠冪等性避免重複
Event System: 模組之間怎麼溝通
DDD 的 Bounded Context 之間不能直接 import。但模組之間一定會有需要通知的時候 — 例如 brand 建立後, ledger 要自動建一個 wallet。怎麼做?
設計
Foundation 提供了一個 event 基礎設施:
Base Entity — 所有 domain entity 都有 event 記錄能力:
1
2
3
4
5
6
7
8
9
10
class Entity :
_events : list
def _record ( self , event : object ):
self . _events . append ( event )
def collect_events ( self ) -> list :
events = list ( self . _events )
self . _events . clear ()
return events
collect_events() 是 clear-on-read — 呼叫一次就清空, 確保 event 不會被重複處理
EventPublisher Protocol — 定義發布 event 的 interface:
1
2
class EventPublisher ( Protocol ):
def publish ( self , event : object ) -> None : ...
EventCollector — request scope 的 event 收集器, 在一個 request 裡面收集所有 event, 等 request 結束再一起 dispatch
EventBus — 多播, 把 event 分發給對應的 handler:
1
2
3
4
5
6
7
8
class EventBus :
def subscribe ( self , event_type : type , handler : Callable ):
self . _handlers [ event_type ] . append ( handler )
def dispatch ( self , events : list ):
for event in events :
for handler in self . _handlers . get ( type ( event ), []):
handler ( event )
實際的 Domain Events
每個 module 定義自己的 event:
1
2
3
4
5
6
7
8
9
10
11
12
13
@dataclass ( frozen = True )
class BrandCreated :
brand_id : UUID
@dataclass ( frozen = True )
class TransactionRecorded :
debit_wallet_id : UUID
credit_wallet_id : UUID
transaction_id : UUID
@dataclass ( frozen = True )
class UserCreated :
user_id : UUID
Event 都是 frozen value object — 不可變, 只攜帶 ID, 不攜帶整個 entity。這樣接收方必須自己去查最新狀態, 不會拿到過時的 snapshot
發布流程
Application Service 裡, EventPublisher 是 optional dependency:
1
2
3
4
5
6
7
8
9
10
11
class BrandService :
def __init__ ( self , repo : BrandRepository , collector : EventPublisher | None = None ):
self . _repo = repo
self . _collector = collector
def create ( self , ... ) -> Brand :
brand = Brand ( ... )
self . _repo . create ( brand )
if self . _collector :
self . _collector . publish ( BrandCreated ( brand_id = brand . id ))
return brand
為什麼是 optional? 因為測試時不一定需要 event system, 注入 None 就可以只測 business logic
Presentation layer (providers.py) 負責把 EventCollector 注入到 Service:
1
2
def get_brand_service ( scope : RequestScope = Depends ( ... )) -> BrandService :
return BrandService ( repo =... , collector = scope . collector )
RequestScope 在每個 HTTP request 開始時建立一個 EventCollector, request 結束後 dispatch 所有收集到的 event
為什麼不直接 import
如果 BrandService 直接 import WalletService 來建 wallet:
1
2
3
4
5
class BrandService :
def create ( self , ... ):
brand = Brand ( ... )
self . _repo . create ( brand )
self . _wallet_service . create_for_brand ( brand . id ) # 直接耦合
這樣 Brands module 就依賴 Ledger module。之後如果要拆微服務, 或是 Ledger module 改了 interface, Brands 就要跟著改
用 event 的話:
Brands module 只管發 BrandCreated event
Ledger module 訂閱 BrandCreated, 自己決定要不要建 wallet
兩個 module 完全不知道彼此的存在
中間只透過 event 的 data class 溝通, 而 event 是 foundation 層的東西
整理
問題
踩坑
解法
Aggregate boundary
邏輯散落在 Service, entity 沒保護好自己
entity 帶行為, 外部只能透過 root 操作
狀態機
多處呼叫 activate, 有的忘記檢查前置條件
轉換邏輯寫在 entity method 裡
餘額計算
直接存 balance 欄位, 並發和 audit 問題
double-entry, 餘額從交易紀錄算出
重複交易
Webhook retry 產生重複佣金
unique constraint + INSERT ON CONFLICT
模組耦合
module A 直接 import module B
domain event, 只透過 event 通知
下一篇記錄 GCP runtime contract、auth bypass 設計和 CI/CD auto-deploy 的做法
References