Skip to content

FastAPI 后端实现

后端核心在 examples/durable-mini/backend/app/engine.py。它把 Restate 的 context action 思想压缩成两个方法:runsleep,并通过 business_orders 表展示 workflow 如何驱动业务状态。

API 层

POST /api/orders 创建 invocation。它读取 Idempotency-Key,如果 key 已存在,直接返回旧 invocation。

python
@app.post("/api/orders", response_model=InvocationOut)
def create_order(payload: OrderRequest, idempotency_key: str | None = Header(default=None)):
    key = idempotency_key or payload.idempotency_key
    return create_or_get_invocation(key, payload.model_dump())

Worker loop

后台 worker 周期扫描可执行 invocation:

python
while True:
    ids = list_runnable_invocation_ids()
    for invocation_id in ids:
        run_invocation_once(invocation_id)
    await asyncio.sleep(1)

教学版没有做复杂调度,只要状态是 QUEUEDRETRYABLEWAITING_TIMER,worker 都会尝试执行。timer 未到期时,ctx.sleep 会再次暂停。

ctx.run

python
def run(self, name: str, fn: Callable[[], JsonDict]) -> JsonDict:
    step_index = self._next_step_index()
    existing = self._find_step(step_index)
    if existing and existing.status == "COMPLETED":
        self.event(f"replay step {step_index}: {name}")
        return existing.result

    result = fn()
    self._append_step(step_index, name, "COMPLETED", result)
    return result

这就是重放跳过的核心。

业务表更新放在哪里

订单业务代码在 examples/durable-mini/backend/app/handlers.py。每个 durable step 不只返回一个结果,也会更新 business_orders

python
def charge_payment(ctx, order):
    business_order = _get_order(ctx, order["order_id"])
    payment_id = _now_id("pay")
    business_order.payment_id = payment_id
    business_order.status = "PAID"
    ctx.record_event("business order PAID")
    return {"payment_id": payment_id, "order_id": business_order.order_id}

注意这里没有直接 commit()。教学版让业务表更新、runtime event 和 journal entry 通过同一个 SQLAlchemy session 一起提交。这样读者能观察到:

durable step业务表变化
create-order插入订单,status=CREATED
charge-payment写入 payment_idstatus=PAID
reserve-inventory写入 reservation_idstatus=INVENTORY_RESERVED
mark-settlement-waitingstatus=WAITING_SETTLEMENT
send-receipt写入 receipt_idstatus=COMPLETED

重放时,已经完成的 durable step 会直接返回 Journal 里的结果,因此不会再次更新订单表或重新生成支付单号。

ctx.sleep

python
def sleep(self, name: str, seconds: int) -> None:
    step_index = self._next_step_index()
    existing = self._find_step(step_index)
    if existing is None:
        fire_at = utcnow() + timedelta(seconds=seconds)
        self._append_step(step_index, name, "PENDING_TIMER", {"fire_at": fire_at.isoformat()})
        raise SuspendExecution()

    if utcnow() < parse(existing.result["fire_at"]):
        raise SuspendExecution()

    existing.status = "COMPLETED"
    self.session.commit()

注意它不是阻塞等待,而是把等待意图写入 Journal,然后让 invocation 进入 WAITING_TIMER

故障注入

业务 handler 支持 crash_after

python
maybe_crash(ctx, payload, "charge-payment")

它只在第一次 attempt 触发。这样你可以观察第二次 attempt 如何 replay 已有步骤。

生产化缺口

教学版足以解释原理,但离生产系统还有距离:

缺口生产化方向
多 worker 竞争行级锁、租约、epoch fencing
外部副作用窗口外部 idempotency key、outbox、补偿
日志与物化视图未分离append-only log + materializer
无服务协议独立 SDK、双向流、action ack
无分区key hash、partition processor、跨分区投递

Teaching project inspired by Restate's public architecture and documentation.