diff --git a/mcp/client.go b/mcp/client.go index d4923b38..b8b4f750 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -687,15 +687,27 @@ func (client *Client) CallWithRequestStream(req *Request, onChunk func(string)) return "", fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(body)) } - var accumulated strings.Builder - scanner := bufio.NewScanner(resp.Body) - - for scanner.Scan() { - // Ping the watchdog: we received a line, reset the idle timer. + return ParseSSEStream(resp.Body, onChunk, func() { select { case resetCh <- struct{}{}: default: } + }) +} + +// ParseSSEStream reads an SSE response body, accumulates text deltas, +// and calls onChunk with the full accumulated text after each chunk. +// If onLine is non-nil, it is called after each raw SSE line is scanned +// (useful for resetting idle-timeout watchdogs). +// Returns the complete accumulated text. +func ParseSSEStream(body io.Reader, onChunk func(string), onLine func()) (string, error) { + var accumulated strings.Builder + scanner := bufio.NewScanner(body) + + for scanner.Scan() { + if onLine != nil { + onLine() + } line := scanner.Text() if !strings.HasPrefix(line, "data: ") { @@ -706,7 +718,6 @@ func (client *Client) CallWithRequestStream(req *Request, onChunk func(string)) break } - // Parse the SSE JSON chunk var chunk struct { Choices []struct { Delta struct { diff --git a/mcp/payment/claw402.go b/mcp/payment/claw402.go index 623d70fb..9dc4a6b8 100644 --- a/mcp/payment/claw402.go +++ b/mcp/payment/claw402.go @@ -125,7 +125,7 @@ func (c *Claw402Client) resolveEndpoint() string { func (c *Claw402Client) SetAuthHeader(h http.Header) { X402SetAuthHeader(h) } func (c *Claw402Client) Call(systemPrompt, userPrompt string) (string, error) { - return X402Call(c.Client, c.signPayment, "Claw402", systemPrompt, userPrompt) + return X402CallStream(c.Client, c.signPayment, "Claw402", systemPrompt, userPrompt, nil) } func (c *Claw402Client) CallWithRequestFull(req *mcp.Request) (*mcp.LLMResponse, error) { diff --git a/mcp/payment/x402.go b/mcp/payment/x402.go index ef424c4d..7649de11 100644 --- a/mcp/payment/x402.go +++ b/mcp/payment/x402.go @@ -2,6 +2,7 @@ package payment import ( "bytes" + "context" "crypto/ecdsa" "encoding/base64" "encoding/json" @@ -229,6 +230,239 @@ func DoX402Request( return body, nil } +// DoX402RequestStream executes an HTTP request with x402 v2 payment flow and +// returns the open *http.Response for streaming. The caller is responsible for +// reading and closing the response body. +// The provided ctx is attached to the final successful HTTP request so that +// cancelling ctx will immediately close the underlying connection and unblock +// any pending body reads. +func DoX402RequestStream( + ctx context.Context, + httpClient *http.Client, + buildReqFn func() (*http.Request, error), + signFn X402SignFunc, + providerTag string, + logger mcp.Logger, +) (*http.Response, error) { + // Initial request — use background context (no idle timeout yet). + req, err := buildReqFn() + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + // Non-402 initial response + if resp.StatusCode != http.StatusPaymentRequired { + if resp.StatusCode == http.StatusOK { + return resp, nil + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("%s API error (status %d): %s", providerTag, resp.StatusCode, string(body)) + } + + // 402 — extract payment header and sign + paymentHeader := resp.Header.Get("Payment-Required") + if paymentHeader == "" { + paymentHeader = resp.Header.Get("X-Payment-Required") + } + if paymentHeader == "" { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("received 402 but no Payment-Required header found. Body: %s", string(body)) + } + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + paymentSig, err := signFn(paymentHeader) + if err != nil { + return nil, fmt.Errorf("failed to sign x402 payment: %w", err) + } + + // Retry loop for the payment-signed request. + // Attach ctx to these requests so the caller can cancel body reads. + var lastStatus int + var lastBody []byte + for attempt := 1; attempt <= X402MaxPaymentRetries; attempt++ { + req2, err := buildReqFn() + if err != nil { + return nil, fmt.Errorf("failed to build retry request: %w", err) + } + req2 = req2.WithContext(ctx) + req2.Header.Set("X-Payment", paymentSig) + req2.Header.Set("Payment-Signature", paymentSig) + + resp2, err := httpClient.Do(req2) + if err != nil { + if attempt < X402MaxPaymentRetries { + wait := X402RetryBaseWait * time.Duration(attempt) + logger.Warnf("⚠️ [%s] Payment request failed: %v, retrying in %v (%d/%d)...", + providerTag, err, wait, attempt+1, X402MaxPaymentRetries) + time.Sleep(wait) + continue + } + return nil, fmt.Errorf("failed to send payment retry: %w", err) + } + + if resp2.StatusCode == http.StatusOK { + if txHash := resp2.Header.Get("Payment-Response"); txHash != "" { + logger.Infof("💰 [%s] Payment tx: %s", providerTag, txHash) + } + if attempt > 1 { + logger.Infof("✅ [%s] Payment retry succeeded on attempt %d", providerTag, attempt) + } + return resp2, nil // caller reads and closes body + } + + // Non-200: read body for error handling / re-sign + body2, readErr := io.ReadAll(resp2.Body) + resp2.Body.Close() + if readErr != nil { + return nil, fmt.Errorf("failed to read payment retry response: %w", readErr) + } + + lastBody = body2 + lastStatus = resp2.StatusCode + + retryable := resp2.StatusCode >= 500 || resp2.StatusCode == http.StatusPaymentRequired + + if retryable && attempt < X402MaxPaymentRetries { + wait := X402RetryBaseWait * time.Duration(attempt) + + if resp2.StatusCode == http.StatusPaymentRequired { + newHeader := resp2.Header.Get("Payment-Required") + if newHeader == "" { + newHeader = resp2.Header.Get("X-Payment-Required") + } + if newHeader != "" { + newSig, signErr := signFn(newHeader) + if signErr == nil { + paymentSig = newSig + logger.Warnf("⚠️ [%s] Payment expired (402), re-signed and retrying in %v (%d/%d)...", + providerTag, wait, attempt+1, X402MaxPaymentRetries) + } else { + logger.Warnf("⚠️ [%s] Payment expired (402), re-sign failed: %v, retrying in %v (%d/%d)...", + providerTag, signErr, wait, attempt+1, X402MaxPaymentRetries) + } + } else { + logger.Warnf("⚠️ [%s] Got 402 but no new Payment-Required header, retrying in %v (%d/%d)...", + providerTag, wait, attempt+1, X402MaxPaymentRetries) + } + } else { + logger.Warnf("⚠️ [%s] Server error (status %d), retrying in %v (%d/%d)...", + providerTag, resp2.StatusCode, wait, attempt+1, X402MaxPaymentRetries) + } + + time.Sleep(wait) + continue + } + + break + } + + return nil, fmt.Errorf("%s payment retry failed (status %d): %s", providerTag, lastStatus, string(lastBody)) +} + +// x402StreamIdleTimeout is the idle timeout for SSE streaming through x402. +// If no SSE line arrives for this duration, the stream is considered stalled. +const x402StreamIdleTimeout = 90 * time.Second + +// X402CallStream handles the x402 payment flow with streaming for the simple Call path. +// It adds "stream": true to the request body and uses ParseSSEStream to read chunks. +// +// Robustness: uses TeeReader so the raw body is captured while parsing SSE. +// If SSE parsing yields no text (e.g. server returned plain JSON despite stream:true), +// falls back to ParseMCPResponse on the buffered body. +func X402CallStream(c *mcp.Client, signFn X402SignFunc, tag string, systemPrompt, userPrompt string, onChunk func(string)) (string, error) { + c.Log.Infof("📡 [%s] Request AI Server (stream): %s", tag, c.BaseURL) + + requestBody := c.Hooks.BuildMCPRequestBody(systemPrompt, userPrompt) + requestBody["stream"] = true + jsonData, err := c.Hooks.MarshalRequestBody(requestBody) + if err != nil { + return "", err + } + + // Idle-timeout context: cancel() closes the underlying TCP connection, + // which immediately unblocks any pending resp.Body.Read(). + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp, err := DoX402RequestStream(ctx, c.HTTPClient, func() (*http.Request, error) { + return c.Hooks.BuildRequest(c.Hooks.BuildUrl(), jsonData) + }, signFn, tag, c.Log) + if err != nil { + return "", err + } + defer resp.Body.Close() + + ct := resp.Header.Get("Content-Type") + c.Log.Infof("📡 [%s] Response Content-Type: %s", tag, ct) + + // Start idle-timeout watchdog AFTER the 402 dance is done. + resetCh := make(chan struct{}, 1) + go func() { + t := time.NewTimer(x402StreamIdleTimeout) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + c.Log.Warnf("⚠️ [%s] SSE idle timeout (%v), cancelling stream", tag, x402StreamIdleTimeout) + cancel() // closes the TCP connection → body.Read() returns error + return + case <-resetCh: + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(x402StreamIdleTimeout) + } + } + }() + + onLine := func() { + select { + case resetCh <- struct{}{}: + default: + } + } + + // TeeReader: body is streamed through SSE parser AND captured in bodyBuf. + // If SSE yields nothing (server returned JSON), we can still parse bodyBuf. + var bodyBuf bytes.Buffer + tee := io.TeeReader(resp.Body, &bodyBuf) + + text, sseErr := mcp.ParseSSEStream(tee, onChunk, onLine) + + if text != "" { + c.Log.Infof("📡 [%s] SSE stream complete, got %d chars", tag, len(text)) + return text, nil + } + + // SSE yielded nothing — try JSON fallback on the buffered body. + if bodyBuf.Len() > 0 { + c.Log.Infof("📡 [%s] SSE empty, trying JSON fallback on %d bytes", tag, bodyBuf.Len()) + jsonText, jsonErr := c.Hooks.ParseMCPResponse(bodyBuf.Bytes()) + if jsonErr == nil && jsonText != "" { + return jsonText, nil + } + c.Log.Warnf("⚠️ [%s] JSON fallback also failed: %v", tag, jsonErr) + } + + if sseErr != nil { + return "", fmt.Errorf("[%s] stream failed: %w", tag, sseErr) + } + return "", fmt.Errorf("[%s] no content received (SSE empty, body %d bytes)", tag, bodyBuf.Len()) +} + // X402BuildRequest creates a POST request with Content-Type but no auth header. func X402BuildRequest(url string, jsonData []byte) (*http.Request, error) { req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))