Files
nofx/docs/architecture/X402_STREAMING_PAYMENT.md
2026-03-16 13:53:27 +08:00

283 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# x402 Streaming Payment Architecture
## Overview
NOFX calls AI models (DeepSeek, GPT, Claude, etc.) through the claw402 gateway, using the [x402 protocol](https://github.com/coinbase/x402) to pay per request with USDC on Base L2.
This document describes the full implementation of the SSE streaming call mode, including client, server, and billing logic.
## Why Streaming Is Needed
```
NOFX (client) ──→ Cloudflare (100s idle limit) ──→ claw402 (gateway) ──→ AI upstream
```
- DeepSeek inference takes 60180 seconds (up to 5 minutes)
- Cloudflare enforces a **100-second hard limit** on idle connections, returning 520/EOF on timeout
- Non-streaming mode: the client receives no data until inference completes — Cloudflare disconnects after 100 seconds
- Streaming mode: the first byte arrives within seconds, subsequent chunks flow continuously, keeping Cloudflare alive
## End-to-End Request Flow
```
NOFX Client claw402 Gateway AI Upstream
│ │ │
── Phase 1: Payment ──────────────────────────────────────────────────────────────────────────────
│ │ │
1. POST /api/v1/ai/... │ ─── body + stream:true ──────────→ │ │
(no payment header) │ │ │
│ ←── 402 + Payment-Required ────── │ │
│ (base64 JSON: price/chain/asset) │
│ │ │
2. EIP-712 signing │ │ │
(USDC TransferWithAuth)│ │ │
│ │ │
3. POST + X-Payment hdr │ ─── body + signature ────────────→ │ │
│ │ ── verify signature → Facilitator
│ │ ←── OK ──────────── Facilitator
│ │ ── settle USDC ───→ Facilitator
│ │ ←── tx hash ─────── Facilitator
│ │ │
── Phase 2: Streaming Response ───────────────────────────────────────────────────────────────────
│ │ │
│ ←── 200 OK ────────────────────── │ ─── POST stream:true ────────→ │
│ ←── data: {"choices":[...]} ───── │ ←── SSE chunk ──────────────── │
│ ←── data: {"choices":[...]} ───── │ ←── SSE chunk ──────────────── │
│ ←── ... (continuous) ──────────── │ ←── ... ─────────────────────── │
│ ←── data: [DONE] ──────────────── │ ←── data: [DONE] ────────────── │
```
## Client Implementation (NOFX)
### File Structure
| File | Responsibility |
|------|----------------|
| `mcp/payment/claw402.go` | Claw402Client — model routing, wallet management |
| `mcp/payment/x402.go` | x402 payment flow core — DoX402RequestStream, X402CallStream |
| `mcp/client.go` | ParseSSEStream — shared SSE parsing function |
### Call Chain
```
Claw402Client.Call()
└→ X402CallStream() // x402.go:380
├→ Build request body + inject stream:true
├→ DoX402RequestStream() // x402.go:239
│ ├→ Send initial request (no payment header)
│ ├→ Receive 402 → parse Payment-Required header
│ ├→ signFn() → EIP-712 signature
│ └→ Send retry request with X-Payment header → return open *http.Response
├→ Start idle timeout watchdog (90s with no data → disconnect)
├→ TeeReader: simultaneous SSE parsing + raw byte buffering
├→ ParseSSEStream() // client.go:703
│ ├→ bufio.Scanner line-by-line read
│ ├→ Parse "data: {...}" → OpenAI chunk format
│ └→ Accumulate text + call onChunk callback
└→ Fallback: if SSE yields nothing, try JSON parsing on buffered bodyBuf
```
### Request Identification
Every request carries an `X-Client-ID: nofx` header (`x402.go:473`), allowing claw402 to identify the request source for logging and monitoring.
### Model Routing
`claw402ModelEndpoints` maps user-friendly model names to API paths:
```go
"deepseek" "/api/v1/ai/deepseek/chat"
"gpt-5.4" "/api/v1/ai/openai/chat/5.4"
"claude-opus" "/api/v1/ai/anthropic/messages/opus"
"qwen-max" "/api/v1/ai/qwen/chat/max"
// ... more
```
Anthropic endpoints (containing `/anthropic/`) automatically switch to the Messages API wire format.
## Server Implementation (claw402)
### Core Problem: ginmw Is Incompatible with SSE
Coinbase's standard Gin middleware `ginmw.PaymentMiddlewareFromConfig` internally works as follows:
```
1. Wrap c.Writer with responseCapture (all writes go to buffer)
2. c.Next() — handler runs, SSE chunks all go into buffer
3. Settle payment after handler completes
4. Write buffered content to client only after successful settlement
```
Problems:
- SSE chunks are buffered — the client receives no data for minutes
- Cloudflare disconnects after 100 seconds → 520 error
- Handler runs too long (5 min), settlement context expires
### Solution: streamAwareX402Middleware
Dual-path design (`internal/gateway/x402.go`):
```go
func streamAwareX402Middleware(streamServer, standardMW) {
return func(c *gin.Context) {
if !isStreamingBody(c) {
standardMW(c) // Non-streaming → standard ginmw (battle-tested)
return
}
// Streaming → custom path
}
}
```
#### Non-Streaming Path
Delegates entirely to `ginmw.PaymentMiddlewareFromConfig` with no custom logic.
#### Streaming Path (Pre-Settlement)
```
1. isStreamingBody(c) — read body to check for {"stream": true}, restore body
2. streamServer.RequiresPayment(reqCtx) — does this route require payment?
3. streamServer.ProcessHTTPRequest() — verify X-Payment signature
4. handleStreamingPayment():
a. ProcessSettlement() — settle USDC on-chain (collect payment first)
b. c.Next() — pass to HandleAPIKeyStream
c. SSE chunks write directly to c.Writer (no responseCapture buffer)
```
Key differences:
| | Standard ginmw (non-streaming) | Custom path (streaming) |
|---|---|---|
| Settlement timing | **After** handler completes | **Before** handler starts |
| Response buffer | `responseCapture` buffers everything | No buffer, writes directly to client |
| Timeout risk | Slow handler causes context expiry | Settlement uses `context.Background()` |
| SSE compatible | No | Yes |
## Billing Logic
### x402 Protocol Flow
x402 is an HTTP 402 payment protocol proposed by Coinbase. Core roles:
- **Resource Server** (claw402) — provides paid APIs
- **Client** (NOFX) — consumer, holds an EVM wallet
- **Facilitator** (Coinbase CDP) — verifies signatures, executes on-chain settlement
### Payment Signing (EIP-712)
Client signature type: USDC `TransferWithAuthorization`
```
1. Receive Payment-Required header from 402 response (base64 JSON)
2. Decode to get:
- scheme: "exact"
- network: "eip155:8453" (Base L2)
- amount: USDC amount (e.g., "3000" = $0.003)
- asset: USDC contract address
- payTo: claw402 recipient address
3. Sign with wallet private key using EIP-712, authorizing USDC transfer from user wallet to payTo
4. Place signature in X-Payment + Payment-Signature headers
```
### Pricing Models
Each AI model route has its own price configured in claw402:
| Mode | Description | Example |
|------|-------------|---------|
| Fixed price | Specified directly via `user_price` field | `$0.003` per request |
| Token-based dynamic pricing | Calculated from request token count | `$0.001` per 1K tokens |
| Dispatch fallback | Default price for SDK-compatible routes | `$0.01` per request |
```go
// Fixed price
price := fmt.Sprintf("$%s", route.UserPrice)
// Dynamic pricing
price = DynamicPriceFunc(func(ctx, reqCtx) (Price, error) {
return resolveDynamicPrice(ctx, reqCtx, rule)
})
```
### Retry Logic and Double-Charge Prevention
```go
const X402MaxPaymentRetries = 5
const X402RetryBaseWait = 3 * time.Second
```
- **5xx errors** → exponential backoff retry (3s, 6s, 9s...), no re-signing (same payment authorization)
- **Another 402** → previous signature expired, re-sign and retry (on-chain authorization auto-invalidates, **no double charge**)
- **4xx (non-402)** → non-retryable, fail immediately
- Outer retry is set to 1 (`WithMaxRetries(1)`) to prevent outer retries from causing duplicate payments
### Settlement Timing: Streaming vs Non-Streaming
| | Non-Streaming | Streaming |
|---|---|---|
| Settlement timing | After receiving full response | Before streaming begins |
| Risk | Low (content confirmed before charge) | Slightly higher (charge before seeing content) |
| Necessity | Standard mode | Must charge first, otherwise SSE is buffered |
## Timeout Configuration
| Location | Timeout | Purpose |
|----------|---------|---------|
| NOFX `X402Timeout` | 5 min | HTTP client overall timeout |
| NOFX `x402StreamIdleTimeout` | 90s | SSE idle disconnect (prevent hangs) |
| NOFX `CallWithRequestStream` idle | 60s | Idle timeout for non-x402 streaming |
| claw402 `ResponseHeaderTimeout` | 120s | Wait for first byte from AI upstream |
| claw402 `streamingHTTP.Timeout` | 0 (unlimited) | SSE stream can last indefinitely |
| claw402 `standardMW WithTimeout` | 10 min | Non-streaming ginmw overall timeout |
| claw402 `x402PaymentTimeout` | 30s | Payment verification/settlement timeout |
## SSE Fault Tolerance
### TeeReader Dual Parsing
```go
var bodyBuf bytes.Buffer
tee := io.TeeReader(resp.Body, &bodyBuf)
text, sseErr := ParseSSEStream(tee, onChunk, onLine)
if text != "" {
return text, nil // SSE succeeded
}
// SSE yielded nothing → try JSON parsing on bodyBuf (server may have returned non-streaming JSON)
jsonText, _ := ParseMCPResponse(bodyBuf.Bytes())
```
### Idle Timeout Watchdog
```go
go func() {
t := time.NewTimer(90s)
for {
select {
case <-t.C:
cancel() // timeout → cancel context → close TCP → body.Read() returns error
case <-resetCh:
t.Reset(90s) // received SSE line → reset timer
}
}
}()
```
Every incoming SSE line resets the timer. If no data arrives for 90 seconds, the context is cancelled and the TCP connection is closed, preventing indefinite blocking.
## Related Files
### NOFX (Client)
- `mcp/payment/claw402.go` — Claw402Client entry point
- `mcp/payment/x402.go` — x402 payment flow (DoX402Request, DoX402RequestStream, X402CallStream)
- `mcp/payment/x402_sign.go` — EIP-712 signing implementation
- `mcp/client.go` — ParseSSEStream, CallWithRequestStream
### claw402 (Server)
- `internal/gateway/x402.go` — x402 middleware (streamAwareX402Middleware)
- `internal/gateway/proxy/stream.go` — SSE proxy (HandleAPIKeyStream)
- `internal/config/` — Route configuration (pricing, model mapping)