Files
nofx/agent/planner_runtime.go
2026-04-19 16:06:28 +08:00

2467 lines
80 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"nofx/mcp"
)
const (
plannerMaxSteps = 8
plannerMaxIterations = 12
observationMaxLength = 400
)
var (
plannerCreateTimeout = 36 * time.Second
plannerReplanTimeout = 24 * time.Second
plannerReasonTimeout = 30 * time.Second
plannerFinalTimeout = 36 * time.Second
directReplyTimeout = 8 * time.Second
)
type replannerDecision struct {
Action string `json:"action"`
Goal string `json:"goal,omitempty"`
Steps []PlanStep `json:"steps,omitempty"`
Instruction string `json:"instruction,omitempty"`
Question string `json:"question,omitempty"`
}
type readFastPathRequest struct {
Kind string
ArgsJSON string
}
type directReplyDecision struct {
Action string `json:"action"`
Answer string `json:"answer,omitempty"`
}
func latestAskedQuestion(state ExecutionState) string {
if state.Waiting != nil && strings.TrimSpace(state.Waiting.Question) != "" {
return strings.TrimSpace(state.Waiting.Question)
}
for i := len(state.Steps) - 1; i >= 0; i-- {
step := state.Steps[i]
if step.Type == planStepTypeAskUser {
if q := strings.TrimSpace(step.Instruction); q != "" {
return q
}
if q := strings.TrimSpace(step.OutputSummary); q != "" {
return q
}
}
}
if state.Status == executionStatusWaitingUser {
return strings.TrimSpace(state.FinalAnswer)
}
return ""
}
func buildWaitingState(state ExecutionState, step PlanStep, question string) *WaitingState {
waiting := &WaitingState{
Question: strings.TrimSpace(question),
Intent: inferWaitingIntent(state.Goal, step, question),
PendingFields: inferPendingFields(step, question),
ConfirmationTarget: inferConfirmationTarget(state.Goal, step, question),
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
return normalizeWaitingState(waiting)
}
func inferWaitingIntent(goal string, step PlanStep, question string) string {
lowerGoal := strings.ToLower(strings.TrimSpace(goal))
lowerQuestion := strings.ToLower(strings.TrimSpace(question))
switch {
case step.RequiresConfirmation || strings.Contains(lowerQuestion, "需要我") || strings.Contains(lowerQuestion, "confirm") || strings.Contains(lowerQuestion, "确认"):
return "confirm_action"
case strings.Contains(lowerGoal, "交易员") || strings.Contains(lowerGoal, "trader"):
return "complete_trader_setup"
case strings.Contains(lowerGoal, "交易所") || strings.Contains(lowerGoal, "exchange"):
return "complete_exchange_config"
case strings.Contains(lowerGoal, "模型") || strings.Contains(lowerGoal, "model"):
return "complete_model_config"
default:
return "provide_missing_information"
}
}
func inferPendingFields(step PlanStep, question string) []string {
source := strings.ToLower(strings.TrimSpace(question))
if source == "" {
sourceBytes, _ := json.Marshal(step.ToolArgs)
source = strings.ToLower(string(sourceBytes))
}
candidates := []struct {
key string
patterns []string
}{
{key: "ai_model_id", patterns: []string{"ai_model_id", "model id", "模型id", "模型 id"}},
{key: "exchange_id", patterns: []string{"exchange_id", "exchange id", "交易所id", "交易所 id"}},
{key: "strategy_id", patterns: []string{"strategy_id", "strategy id", "策略id", "策略 id"}},
{key: "name", patterns: []string{"trader name", "name", "名字", "名称"}},
{key: "api_key", patterns: []string{"api key", "apikey", "api_key"}},
{key: "secret_key", patterns: []string{"secret key", "secret_key", "密钥"}},
{key: "passphrase", patterns: []string{"passphrase", "密码短语"}},
}
fields := make([]string, 0, len(candidates))
for _, candidate := range candidates {
for _, pattern := range candidate.patterns {
if strings.Contains(source, pattern) {
fields = append(fields, candidate.key)
break
}
}
}
return cleanStringList(fields)
}
func inferConfirmationTarget(goal string, step PlanStep, question string) string {
if step.RequiresConfirmation {
if step.ToolName != "" {
return step.ToolName
}
}
lowerGoal := strings.ToLower(strings.TrimSpace(goal))
lowerQuestion := strings.ToLower(strings.TrimSpace(question))
switch {
case strings.Contains(lowerGoal, "交易员") || strings.Contains(lowerQuestion, "交易员") || strings.Contains(lowerGoal, "trader"):
return "trader"
case strings.Contains(lowerGoal, "交易所") || strings.Contains(lowerQuestion, "交易所") || strings.Contains(lowerGoal, "exchange"):
return "exchange_config"
case strings.Contains(lowerGoal, "模型") || strings.Contains(lowerQuestion, "模型") || strings.Contains(lowerGoal, "model"):
return "model_config"
default:
return ""
}
}
func isConfigOrTraderIntent(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
keywords := []string{
"交易员", "trader", "exchange", "交易所", "模型", "model", "api key", "apikey",
"绑定", "配置", "setup", "configure", "deepseek", "openai", "claude", "gemini",
"okx", "binance", "bybit", "gate", "kucoin", "hyperliquid", "aster", "lighter",
}
for _, kw := range keywords {
if strings.Contains(lower, kw) {
return true
}
}
return false
}
func isStrategyIntent(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
keywords := []string{
"策略", "strategy", "template", "模板", "激进", "趋势跟踪", "网格策略",
"量化策略", "策略模板", "strategy studio",
}
for _, kw := range keywords {
if strings.Contains(lower, kw) {
return true
}
}
return false
}
func isRealtimeAccountIntent(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
keywords := []string{
"余额", "balance", "equity", "净值", "available", "available balance",
"持仓", "position", "positions", "仓位", "unrealized pnl", "浮盈", "浮亏",
"交易历史", "trade history", "history", "closed trades", "recent trades",
"订单", "order", "orders", "成交", "pnl", "profit", "loss",
}
for _, kw := range keywords {
if strings.Contains(lower, kw) {
return true
}
}
return false
}
func snapshotKindsForIntent(userText string) []string {
kinds := make([]string, 0, 6)
return uniqueStrings(kinds)
}
func uniqueStrings(values []string) []string {
if len(values) == 0 {
return nil
}
out := make([]string, 0, len(values))
seen := make(map[string]struct{}, len(values))
for _, value := range values {
if _, ok := seen[value]; ok {
continue
}
seen[value] = struct{}{}
out = append(out, value)
}
return out
}
func withPlannerStageTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout <= 0 {
return context.WithCancel(ctx)
}
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= timeout {
return context.WithCancel(ctx)
}
}
return context.WithTimeout(ctx, timeout)
}
func isPlannerTimeoutError(err error) bool {
if err == nil {
return false
}
return errors.Is(err, context.DeadlineExceeded)
}
func plannerTimeoutMessage(lang string) string {
if lang == "zh" {
return "⏱️ 当前请求处理超时,请重试一次。若持续出现,请把问题拆小一点。"
}
return "⏱️ This request timed out. Please try again, or break it into a smaller request."
}
func shouldResetExecutionStateForNewAttempt(text string, state ExecutionState) bool {
if state.SessionID == "" {
return false
}
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
retrySignals := []string{
"再试", "重试", "重新", "继续", "继续创建", "我已经配置好了", "已经配置好了", "我配好了",
"我已经弄好了", "已经弄好了", "好了", "retry", "try again", "continue", "resume",
"i configured it", "i've configured it", "i already configured", "configured already",
}
for _, signal := range retrySignals {
if strings.Contains(lower, signal) {
return true
}
}
if isConfigOrTraderIntent(lower) && (state.Status == executionStatusFailed || state.Status == executionStatusCompleted) {
return true
}
if isConfigOrTraderIntent(lower) && state.Status == executionStatusWaitingUser {
return true
}
return false
}
func ensureCurrentReferences(state *ExecutionState) {
if state.CurrentReferences == nil {
state.CurrentReferences = &CurrentReferences{}
}
}
func preferReference(current **EntityReference, id, name string) {
id = strings.TrimSpace(id)
name = strings.TrimSpace(name)
if id == "" && name == "" {
return
}
if *current == nil {
*current = &EntityReference{}
}
if id != "" {
(*current).ID = id
}
if name != "" {
(*current).Name = name
}
}
func matchEntityReference(text string, candidates []EntityReference) *EntityReference {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return nil
}
var matched *EntityReference
for _, candidate := range candidates {
id := strings.ToLower(strings.TrimSpace(candidate.ID))
name := strings.ToLower(strings.TrimSpace(candidate.Name))
if id == "" && name == "" {
continue
}
if (id != "" && strings.Contains(lower, id)) || (name != "" && strings.Contains(lower, name)) {
if matched != nil {
return nil
}
copy := candidate
matched = &copy
}
}
return matched
}
func (a *Agent) refreshCurrentReferencesForUserText(storeUserID, text string, state *ExecutionState) {
if a.store == nil || strings.TrimSpace(text) == "" {
return
}
ensureCurrentReferences(state)
if strategies, err := a.store.Strategy().List(storeUserID); err == nil {
candidates := make([]EntityReference, 0, len(strategies))
for _, strategy := range strategies {
candidates = append(candidates, EntityReference{ID: strategy.ID, Name: strategy.Name})
}
if ref := matchEntityReference(text, candidates); ref != nil {
preferReference(&state.CurrentReferences.Strategy, ref.ID, ref.Name)
}
}
if traders, err := a.store.Trader().List(storeUserID); err == nil {
candidates := make([]EntityReference, 0, len(traders))
for _, trader := range traders {
candidates = append(candidates, EntityReference{ID: trader.ID, Name: trader.Name})
}
if ref := matchEntityReference(text, candidates); ref != nil {
preferReference(&state.CurrentReferences.Trader, ref.ID, ref.Name)
}
}
if models, err := a.store.AIModel().List(storeUserID); err == nil {
candidates := make([]EntityReference, 0, len(models))
for _, model := range models {
name := model.Name
if name == "" {
name = model.CustomModelName
}
if name == "" {
name = model.Provider
}
candidates = append(candidates, EntityReference{ID: model.ID, Name: name})
}
if ref := matchEntityReference(text, candidates); ref != nil {
preferReference(&state.CurrentReferences.Model, ref.ID, ref.Name)
}
}
if exchanges, err := a.store.Exchange().List(storeUserID); err == nil {
candidates := make([]EntityReference, 0, len(exchanges))
for _, exchange := range exchanges {
name := exchange.AccountName
if name == "" {
name = exchange.ExchangeType
}
candidates = append(candidates, EntityReference{ID: exchange.ID, Name: name})
}
if ref := matchEntityReference(text, candidates); ref != nil {
preferReference(&state.CurrentReferences.Exchange, ref.ID, ref.Name)
}
}
}
func updateCurrentReferencesFromToolResult(state *ExecutionState, toolName, raw string) bool {
if strings.TrimSpace(raw) == "" {
return false
}
var payload map[string]any
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
return false
}
ensureCurrentReferences(state)
before, _ := json.Marshal(state.CurrentReferences)
switch toolName {
case "manage_strategy":
if item, ok := payload["strategy"].(map[string]any); ok {
preferReference(&state.CurrentReferences.Strategy, asString(item["id"]), asString(item["name"]))
}
case "manage_trader":
if item, ok := payload["trader"].(map[string]any); ok {
preferReference(&state.CurrentReferences.Trader, asString(item["id"]), asString(item["name"]))
preferReference(&state.CurrentReferences.Model, asString(item["ai_model_id"]), "")
preferReference(&state.CurrentReferences.Exchange, asString(item["exchange_id"]), "")
preferReference(&state.CurrentReferences.Strategy, asString(item["strategy_id"]), "")
}
case "manage_model_config":
if item, ok := payload["model"].(map[string]any); ok {
name := asString(item["name"])
if name == "" {
name = asString(item["provider"])
}
preferReference(&state.CurrentReferences.Model, asString(item["id"]), name)
}
case "manage_exchange_config":
if item, ok := payload["exchange"].(map[string]any); ok {
name := asString(item["account_name"])
if name == "" {
name = asString(item["exchange_type"])
}
preferReference(&state.CurrentReferences.Exchange, asString(item["id"]), name)
}
case "get_strategies":
if items, ok := payload["strategies"].([]any); ok && len(items) == 1 {
if item, ok := items[0].(map[string]any); ok {
preferReference(&state.CurrentReferences.Strategy, asString(item["id"]), asString(item["name"]))
}
}
}
state.CurrentReferences = normalizeCurrentReferences(state.CurrentReferences)
after, _ := json.Marshal(state.CurrentReferences)
return string(before) != string(after)
}
func asString(v any) string {
s, _ := v.(string)
return strings.TrimSpace(s)
}
func containsAnyKeyword(text string, keywords []string) bool {
for _, keyword := range keywords {
if strings.Contains(text, keyword) {
return true
}
}
return false
}
func detectReadFastPath(text string) *readFastPathRequest {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return nil
}
switch lower {
case "/traders":
return &readFastPathRequest{Kind: "list_traders"}
case "/strategies":
return &readFastPathRequest{Kind: "get_strategies"}
case "/models":
return &readFastPathRequest{Kind: "get_model_configs"}
case "/exchanges":
return &readFastPathRequest{Kind: "get_exchange_configs"}
case "/balance":
return &readFastPathRequest{Kind: "get_balance"}
case "/positions":
return &readFastPathRequest{Kind: "get_positions"}
case "/history", "/trades":
return &readFastPathRequest{Kind: "get_trade_history", ArgsJSON: `{"limit":10}`}
default:
return nil
}
}
func (a *Agent) tryReadFastPath(storeUserID string, userID int64, lang, text string) (string, bool) {
req := detectReadFastPath(text)
if req == nil {
return "", false
}
if a.history == nil {
a.history = newChatHistory(100)
}
a.history.Add(userID, "user", text)
raw := a.executeReadFastPath(storeUserID, userID, req)
answer := formatReadFastPathResponse(lang, req.Kind, raw)
a.history.Add(userID, "assistant", answer)
if !isEphemeralReadFastPathKind(req.Kind) {
a.maybeUpdateTaskStateIncrementally(context.Background(), userID)
a.maybeCompressHistory(context.Background(), userID)
}
return answer, true
}
func isEphemeralReadFastPathKind(kind string) bool {
switch kind {
case "get_balance", "get_positions", "get_trade_history":
return true
default:
return false
}
}
func (a *Agent) executeReadFastPath(storeUserID string, _ int64, req *readFastPathRequest) string {
switch req.Kind {
case "get_balance":
return a.toolGetBalance()
case "get_positions":
return a.toolGetPositions()
case "get_trade_history":
return a.toolGetTradeHistory(req.ArgsJSON)
case "get_strategies":
return a.toolGetStrategies(storeUserID)
case "list_traders":
return a.toolListTraders(storeUserID)
case "get_model_configs":
return a.toolGetModelConfigs(storeUserID)
case "get_exchange_configs":
return a.toolGetExchangeConfigs(storeUserID)
default:
return `{"error":"unsupported fast path"}`
}
}
func formatReadFastPathResponse(lang, kind, raw string) string {
var payload map[string]any
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
return summarizeObservation(raw)
}
if errMsg, _ := payload["error"].(string); strings.TrimSpace(errMsg) != "" {
return summarizeObservation(raw)
}
switch kind {
case "get_strategies":
items, _ := payload["strategies"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前还没有策略。"
}
return "There are no strategies yet."
}
lines := []string{"Current strategies:"}
if lang == "zh" {
lines[0] = "当前策略:"
}
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
name := asString(entry["name"])
if name == "" {
name = asString(entry["id"])
}
meta := make([]string, 0, 2)
if active, _ := entry["is_active"].(bool); active {
meta = append(meta, "active")
}
if isDefault, _ := entry["is_default"].(bool); isDefault {
meta = append(meta, "default")
}
if len(meta) > 0 {
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
} else {
lines = append(lines, fmt.Sprintf("- %s", name))
}
}
return strings.Join(lines, "\n")
case "list_traders":
items, _ := payload["traders"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前还没有交易员。"
}
return "There are no traders yet."
}
lines := []string{"Current traders:"}
if lang == "zh" {
lines[0] = "当前交易员:"
}
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
name := asString(entry["name"])
line := fmt.Sprintf("- %s", name)
meta := cleanStringList([]string{asString(entry["exchange_type"]), asString(entry["ai_model_id"])})
if len(meta) > 0 {
line += fmt.Sprintf(" (%s)", strings.Join(meta, ", "))
}
lines = append(lines, line)
}
return strings.Join(lines, "\n")
case "get_model_configs":
items, _ := payload["model_configs"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前还没有模型配置。"
}
return "There are no model configs yet."
}
lines := []string{"Current model configs:"}
if lang == "zh" {
lines[0] = "当前模型配置:"
}
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
name := asString(entry["name"])
if name == "" {
name = asString(entry["provider"])
}
meta := make([]string, 0, 2)
if enabled, _ := entry["enabled"].(bool); enabled {
meta = append(meta, "enabled")
}
if model := asString(entry["custom_model_name"]); model != "" {
meta = append(meta, model)
}
if len(meta) > 0 {
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
} else {
lines = append(lines, fmt.Sprintf("- %s", name))
}
}
return strings.Join(lines, "\n")
case "get_exchange_configs":
items, _ := payload["exchange_configs"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前还没有交易所配置。"
}
return "There are no exchange configs yet."
}
lines := []string{"Current exchange configs:"}
if lang == "zh" {
lines[0] = "当前交易所配置:"
}
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
name := asString(entry["account_name"])
if name == "" {
name = asString(entry["exchange_type"])
}
meta := cleanStringList([]string{asString(entry["exchange_type"])})
if enabled, _ := entry["enabled"].(bool); enabled {
meta = append(meta, "enabled")
}
if len(meta) > 0 {
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
} else {
lines = append(lines, fmt.Sprintf("- %s", name))
}
}
return strings.Join(lines, "\n")
case "get_balance":
items, _ := payload["balances"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前没有可用的余额数据。"
}
return "No balance data is available right now."
}
lines := []string{"Current balance overview:"}
if lang == "zh" {
lines[0] = "当前余额概览:"
}
var totalEquity float64
var totalAvailable float64
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
equity := toFloat(entry["total_equity"])
available := toFloat(entry["available"])
totalEquity += equity
totalAvailable += available
lines = append(lines, fmt.Sprintf("- %s (%s): equity %.4f, available %.4f",
asString(entry["name"]), asString(entry["exchange"]),
equity, available))
}
if len(items) > 1 {
if lang == "zh" {
lines = append(lines, fmt.Sprintf("汇总equity %.4f, available %.4f", totalEquity, totalAvailable))
} else {
lines = append(lines, fmt.Sprintf("Total: equity %.4f, available %.4f", totalEquity, totalAvailable))
}
}
return strings.Join(lines, "\n")
case "get_positions":
items, _ := payload["positions"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前没有持仓。"
}
return "There are no open positions right now."
}
lines := []string{"Current positions:"}
if lang == "zh" {
lines[0] = "当前持仓:"
}
for _, item := range items {
entry, ok := item.(map[string]any)
if !ok {
continue
}
lines = append(lines, fmt.Sprintf("- %s %s size %.4f, entry %.4f, pnl %.4f",
asString(entry["symbol"]), asString(entry["side"]),
toFloat(entry["size"]), toFloat(entry["entry_price"]), toFloat(entry["unrealized_pnl"])))
}
return strings.Join(lines, "\n")
case "get_trade_history":
items, _ := payload["trades"].([]any)
if len(items) == 0 {
if lang == "zh" {
return "当前没有已平仓交易历史。"
}
return "There is no closed trade history yet."
}
summary, _ := payload["summary"].(map[string]any)
head := fmt.Sprintf("Recent trades: %.0f total, win rate %s, total PnL %.4f",
toFloat(summary["total_trades"]), asString(summary["win_rate"]), toFloat(summary["total_pnl"]))
if lang == "zh" {
head = fmt.Sprintf("最近交易:共 %.0f 笔,胜率 %s总 PnL %.4f",
toFloat(summary["total_trades"]), asString(summary["win_rate"]), toFloat(summary["total_pnl"]))
}
lines := []string{head}
for idx, item := range items {
if idx >= 5 {
break
}
entry, ok := item.(map[string]any)
if !ok {
continue
}
lines = append(lines, fmt.Sprintf("- %s %s pnl %.4f (%s -> %s)",
asString(entry["symbol"]), asString(entry["side"]), toFloat(entry["pnl"]),
asString(entry["entry_time"]), asString(entry["exit_time"])))
}
return strings.Join(lines, "\n")
default:
return summarizeObservation(raw)
}
}
func (a *Agent) thinkAndAct(ctx context.Context, storeUserID string, userID int64, lang, text string) (string, error) {
if answer, ok, err := a.tryStatePriorityPath(ctx, storeUserID, userID, lang, text, nil); ok || err != nil {
return answer, err
}
if answer, ok := tryInstantDirectReply(lang, text); ok {
return answer, nil
}
if answer, ok := a.tryReadFastPath(storeUserID, userID, lang, text); ok {
return answer, nil
}
if answer, ok, err := a.tryWorkflowIntent(ctx, storeUserID, userID, lang, text, nil); ok || err != nil {
return answer, err
}
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, nil); ok {
return answer, nil
}
if a.aiClient == nil {
return a.noAIFallback(lang, text)
}
return a.runPlannedAgent(ctx, storeUserID, userID, lang, text, nil)
}
func (a *Agent) thinkAndActStream(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
if answer, ok, err := a.tryStatePriorityPath(ctx, storeUserID, userID, lang, text, onEvent); ok || err != nil {
return answer, err
}
if answer, ok := tryInstantDirectReply(lang, text); ok {
if onEvent != nil {
onEvent(StreamEventDelta, answer)
}
return answer, nil
}
if answer, ok := a.tryReadFastPath(storeUserID, userID, lang, text); ok {
if onEvent != nil {
onEvent(StreamEventTool, "read_fast_path")
onEvent(StreamEventDelta, answer)
}
return answer, nil
}
if answer, ok, err := a.tryWorkflowIntent(ctx, storeUserID, userID, lang, text, onEvent); ok || err != nil {
return answer, err
}
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent); ok {
return answer, nil
}
if a.aiClient == nil {
return a.noAIFallback(lang, text)
}
return a.runPlannedAgent(ctx, storeUserID, userID, lang, text, onEvent)
}
func tryInstantDirectReply(lang, text string) (string, bool) {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return "", false
}
zhReplies := map[string]string{
"hi": "在,有什么我帮你看的?",
"hello": "在,有什么我帮你看的?",
"hey": "在,有什么我帮你看的?",
"你好": "在,有什么我帮你看的?",
"嗨": "在,有什么我帮你看的?",
"在吗": "在,有什么我帮你看的?",
"谢谢": "不客气。",
"多谢": "不客气。",
"谢了": "不客气。",
"ok": "好。",
"好的": "好。",
"收到": "好。",
}
enReplies := map[string]string{
"hi": "I'm here. What should we look at?",
"hello": "I'm here. What should we look at?",
"hey": "I'm here. What should we look at?",
"thanks": "You're welcome.",
"thank you": "You're welcome.",
"ok": "Okay.",
"okay": "Okay.",
"got it": "Got it.",
}
if lang == "zh" {
if reply, ok := zhReplies[lower]; ok {
return reply, true
}
if reply, ok := enReplies[lower]; ok {
return reply, true
}
return "", false
}
if reply, ok := enReplies[lower]; ok {
return reply, true
}
return "", false
}
func (a *Agent) hasActiveSkillSession(userID int64) bool {
session := a.getSkillSession(userID)
return strings.TrimSpace(session.Name) != ""
}
func hasActiveExecutionState(state ExecutionState) bool {
if strings.TrimSpace(state.SessionID) == "" {
return false
}
switch strings.TrimSpace(state.Status) {
case executionStatusPlanning, executionStatusRunning, executionStatusWaitingUser:
return true
default:
return false
}
}
func (a *Agent) tryStatePriorityPath(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) {
if workflow := a.getWorkflowSession(userID); hasActiveWorkflowSession(workflow) {
answer, handled, err := a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, workflow, onEvent)
if handled || err != nil {
return answer, true, err
}
}
if session := a.getSkillSession(userID); strings.TrimSpace(session.Name) != "" {
switch a.classifySkillSessionInput(ctx, userID, lang, session, text) {
case "cancel":
a.clearSkillSession(userID)
a.clearWorkflowSession(userID)
if lang == "zh" {
return "已取消当前流程。", true, nil
}
return "Cancelled the current flow.", true, nil
case "interrupt":
a.clearSkillSession(userID)
default:
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent); ok {
return answer, true, nil
}
}
}
state := a.getExecutionState(userID)
if hasActiveExecutionState(state) {
switch classifyExecutionStateInput(state, text) {
case "cancel":
a.clearExecutionState(userID)
if lang == "zh" {
return "已取消当前流程。", true, nil
}
return "Cancelled the current flow.", true, nil
case "interrupt":
a.clearExecutionState(userID)
default:
answer, err := a.runPlannedAgent(ctx, storeUserID, userID, lang, text, onEvent)
return answer, true, err
}
}
return "", false, nil
}
func (a *Agent) classifySkillSessionInput(ctx context.Context, userID int64, lang string, session skillSession, text string) string {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return "continue"
}
if isYesReply(text) || isNoReply(text) {
return "continue"
}
if isExplicitFlowAbort(text) {
return "cancel"
}
if shouldContinueSkillSessionByExpectedSlot(session, text) {
return "continue"
}
if decision := a.classifySkillSessionIntentWithLLM(ctx, userID, lang, session, text); decision != "" {
return decision
}
if isNewSkillRootIntent(session, text) {
return "interrupt"
}
if isSkillFlowDeflection(session, text) {
return "interrupt"
}
if belongsToSkillDomain(session.Name, text) || !looksLikeNewTopLevelIntent(text) {
return "continue"
}
return "interrupt"
}
type skillSessionIntentDecision struct {
Decision string `json:"decision"`
}
func shouldUseLLMSkillSessionClassifier(session skillSession, text string) bool {
if strings.TrimSpace(text) == "" {
return false
}
if isExplicitFlowAbort(text) || isYesReply(text) || isNoReply(text) {
return false
}
if shouldContinueSkillSessionByExpectedSlot(session, text) {
return false
}
return true
}
func shouldContinueSkillSessionByExpectedSlot(session skillSession, text string) bool {
text = strings.TrimSpace(text)
if text == "" {
return false
}
currentStep, ok := currentSkillDAGStep(session)
if !ok {
return false
}
switch currentStep.ID {
case "await_start_confirmation", "await_confirmation":
return isYesReply(text) || isNoReply(text)
case "resolve_config_value":
if fieldValue(session, "config_field") == "selected_timeframes" {
return timeframeTokenRE.MatchString(strings.ToLower(text))
}
return firstIntegerPattern.MatchString(text)
case "collect_enabled":
_, ok := parseEnabledValue(text)
return ok
case "collect_custom_api_url":
return extractURL(text) != ""
case "resolve_exchange_type":
return exchangeTypeFromText(text) != ""
case "resolve_provider":
return providerFromText(text) != ""
case "resolve_name", "collect_name", "collect_prompt", "collect_account_name", "collect_custom_model_name":
return !looksLikeNewTopLevelIntent(text)
}
for _, field := range currentStep.RequiredFields {
switch field {
case "config_value":
return firstIntegerPattern.MatchString(text)
case "enabled":
_, ok := parseEnabledValue(text)
return ok
case "custom_api_url":
return extractURL(text) != ""
}
}
return false
}
func (a *Agent) classifySkillSessionIntentWithLLM(ctx context.Context, userID int64, lang string, session skillSession, text string) string {
if a == nil || a.aiClient == nil {
return ""
}
if !shouldUseLLMSkillSessionClassifier(session, text) {
return ""
}
currentStep, _ := currentSkillDAGStep(session)
recentConversationCtx := a.buildRecentConversationContext(userID, text)
systemPrompt := `You classify one user message while a NOFXi structured management flow is active.
Return JSON only. No markdown.
Possible decisions:
- "continue": the user is still answering the current flow
- "cancel": the user wants to stop the current flow
- "interrupt": the user changed topic, wants diagnosis/query/new task, or should leave the current flow
Be conservative:
- Prefer "continue" only when the message clearly answers the current slot/question.
- Use "cancel" for explicit abandonment like "算了", "不改了", "换话题", "别弄了".
- Use "interrupt" for diagnosis, query, new requests, or topic shifts.`
userPrompt := fmt.Sprintf(
"Language: %s\nActive skill: %s\nAction: %s\nCurrent DAG step: %s\nExpected required fields: %s\nUser message: %s\n\nRecent conversation:\n%s",
lang,
session.Name,
session.Action,
currentStep.ID,
strings.Join(currentStep.RequiredFields, ", "),
text,
recentConversationCtx,
)
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
defer cancel()
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
if err != nil {
return ""
}
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var decision skillSessionIntentDecision
if err := json.Unmarshal([]byte(raw), &decision); err != nil {
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start < 0 || end <= start || json.Unmarshal([]byte(raw[start:end+1]), &decision) != nil {
return ""
}
}
switch strings.TrimSpace(decision.Decision) {
case "continue", "cancel", "interrupt":
return decision.Decision
default:
return ""
}
}
func isSkillFlowDeflection(session skillSession, text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
if containsAny(lower, []string{
"看下报错", "看看报错", "帮我看下报错", "帮我看看报错", "报错怎么回事", "错误怎么回事",
"换话题", "聊别的", "不是这个", "先说别的", "不聊这个",
}) {
return true
}
switch strings.TrimSpace(session.Name) {
case "exchange_management":
return detectModelDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
case "model_management":
return detectExchangeDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
case "strategy_management":
return detectExchangeDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectModelDiagnosisSkill(text)
case "trader_management":
return detectExchangeDiagnosisSkill(text) || detectModelDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
default:
return false
}
}
func isNewSkillRootIntent(session skillSession, text string) bool {
currentSkill := strings.TrimSpace(session.Name)
currentAction := strings.TrimSpace(session.Action)
if currentSkill == "" {
return false
}
switch currentSkill {
case "trader_management":
if detectCreateTraderSkill(text) && currentAction != "create" {
return true
}
if action := normalizeAtomicSkillAction("trader_management", detectManagementAction(text, "trader")); action == "create" && currentAction != "create" {
return true
}
case "strategy_management":
if action := normalizeAtomicSkillAction("strategy_management", detectManagementAction(text, "strategy")); action == "create" && currentAction != "create" {
return true
}
case "model_management":
if action := normalizeAtomicSkillAction("model_management", detectManagementAction(text, "model")); action == "create" && currentAction != "create" {
return true
}
case "exchange_management":
if action := normalizeAtomicSkillAction("exchange_management", detectManagementAction(text, "exchange")); action == "create" && currentAction != "create" {
return true
}
}
return false
}
func classifyExecutionStateInput(state ExecutionState, text string) string {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return "continue"
}
if isExplicitFlowAbort(text) {
return "cancel"
}
if isYesReply(text) || isNoReply(text) || shouldResetExecutionStateForNewAttempt(text, state) {
return "continue"
}
if state.Waiting != nil && !looksLikeNewTopLevelIntent(text) {
return "continue"
}
if looksLikeNewTopLevelIntent(text) {
return "interrupt"
}
return "continue"
}
func isExplicitFlowAbort(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
if isCancelSkillReply(text) {
return true
}
return containsAny(lower, []string{
"算了", "先不", "不配了", "别弄了", "不搞了", "先停", "换个话题", "换话题", "聊点别的", "聊别的",
"stop this", "drop it", "never mind", "forget it", "skip this",
})
}
func belongsToSkillDomain(skillName, text string) bool {
switch strings.TrimSpace(skillName) {
case "trader_management":
return detectCreateTraderSkill(text) || detectTraderManagementIntent(text) || detectTraderDiagnosisSkill(text)
case "strategy_management":
return detectStrategyManagementIntent(text) || detectStrategyDiagnosisSkill(text)
case "model_management":
return detectModelManagementIntent(text) || detectModelDiagnosisSkill(text)
case "exchange_management":
return detectExchangeManagementIntent(text) || detectExchangeDiagnosisSkill(text)
default:
return false
}
}
func looksLikeNewTopLevelIntent(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
if strings.HasPrefix(lower, "/") {
return true
}
if detectCreateTraderSkill(text) ||
detectTraderManagementIntent(text) ||
detectExchangeManagementIntent(text) ||
detectModelManagementIntent(text) ||
detectStrategyManagementIntent(text) ||
detectTraderDiagnosisSkill(text) ||
detectExchangeDiagnosisSkill(text) ||
detectModelDiagnosisSkill(text) ||
detectStrategyDiagnosisSkill(text) {
return true
}
if detectReadFastPath(text) != nil {
return true
}
return containsAny(lower, []string{
"btc", "eth", "sol", "市场", "行情", "余额", "仓位", "持仓", "订单", "账户",
"price", "market", "balance", "position", "portfolio", "account",
})
}
func (a *Agent) tryDirectAnswer(ctx context.Context, userID int64, lang, text string, onEvent func(event, data string)) (string, bool) {
if a.aiClient == nil {
return "", false
}
text = strings.TrimSpace(text)
if text == "" {
return "", false
}
recentConversationCtx := a.buildRecentConversationContext(userID, text)
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
executionState := normalizeExecutionState(a.getExecutionState(userID))
executionJSON, _ := json.Marshal(executionState)
systemPrompt := `You are the first-pass router for NOFXi.
Decide whether the assistant can answer the user's message directly without using skills, tools, or planning.
Return JSON only. Do not return markdown.
Use "direct_answer" only when a concise, self-contained answer is sufficient.
Examples that often fit direct_answer:
- greetings, thanks, small talk
- concept explanations
- open-ended advice that does not require current system state
- trading education or opinion questions that can be answered from general reasoning
Use "defer" when the message likely needs:
- a management or diagnosis skill
- tool reads
- multi-step planning
- continuation of an active execution flow that needs stateful follow-up
Rules:
- Consider Recent conversation, Task state, and Execution state JSON before deciding.
- Default to direct_answer for greetings, thanks, identity questions, and other lightweight conversational turns unless there is a clearly unfinished operational flow that the user is continuing.
- If the user is clearly continuing an unfinished operational flow, choose defer.
- If you choose direct_answer, provide the final user-facing answer in the same language as the user.
- Prefer defer when uncertain.
Return JSON with this exact shape:
{"action":"direct_answer|defer","answer":""}`
userPrompt := fmt.Sprintf("Language: %s\nUser message: %s\n\nRecent conversation:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s", lang, text, recentConversationCtx, taskStateCtx, string(executionJSON))
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
defer cancel()
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
if err != nil {
return "", false
}
decision, err := parseDirectReplyDecision(raw)
if err != nil {
return "", false
}
if decision.Action != "direct_answer" {
return "", false
}
answer := strings.TrimSpace(decision.Answer)
if answer == "" {
return "", false
}
if a.history == nil {
a.history = newChatHistory(100)
}
a.history.Add(userID, "user", text)
a.history.Add(userID, "assistant", answer)
a.maybeUpdateTaskStateIncrementally(ctx, userID)
a.maybeCompressHistory(ctx, userID)
if onEvent != nil {
onEvent(StreamEventDelta, answer)
}
return answer, true
}
func parseDirectReplyDecision(raw string) (directReplyDecision, error) {
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var decision directReplyDecision
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
return normalizeDirectReplyDecision(decision), nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start >= 0 && end > start {
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
return normalizeDirectReplyDecision(decision), nil
}
}
return directReplyDecision{}, fmt.Errorf("invalid direct reply decision json")
}
func normalizeDirectReplyDecision(decision directReplyDecision) directReplyDecision {
decision.Action = strings.TrimSpace(strings.ToLower(decision.Action))
decision.Answer = strings.TrimSpace(decision.Answer)
return decision
}
func (a *Agent) runPlannedAgent(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
a.history.Add(userID, "user", text)
if onEvent != nil {
onEvent(StreamEventPlanning, a.planningStatusText(lang))
}
requestStartedAt := time.Now()
state, err := a.prepareExecutionState(ctx, storeUserID, userID, lang, text)
if err != nil {
a.logPlannerTiming("", userID, "prepare_execution_state", requestStartedAt, err)
if isPlannerTimeoutError(err) {
msg := plannerTimeoutMessage(lang)
if onEvent != nil {
onEvent(StreamEventError, msg)
onEvent(StreamEventDelta, msg)
}
return msg, nil
}
a.logger.Warn("planner failed, falling back to legacy loop", "error", err, "user_id", userID)
return a.thinkAndActLegacy(ctx, userID, lang, text, onEvent)
}
a.logPlannerTiming(state.SessionID, userID, "prepare_execution_state", requestStartedAt, nil)
executionStartedAt := time.Now()
answer, err := a.executePlan(ctx, storeUserID, userID, lang, &state, onEvent)
a.logPlannerTiming(state.SessionID, userID, "execute_plan", executionStartedAt, err)
if err != nil {
if isPlannerTimeoutError(err) {
msg := plannerTimeoutMessage(lang)
if onEvent != nil {
onEvent(StreamEventError, msg)
onEvent(StreamEventDelta, msg)
}
return msg, nil
}
a.logger.Warn("plan execution failed, falling back to legacy loop", "error", err, "user_id", userID)
return a.thinkAndActLegacy(ctx, userID, lang, text, onEvent)
}
a.history.Add(userID, "assistant", answer)
a.maybeUpdateTaskStateIncrementally(ctx, userID)
a.maybeCompressHistory(ctx, userID)
a.logPlannerTiming(state.SessionID, userID, "run_planned_agent_total", requestStartedAt, nil)
return answer, nil
}
func (a *Agent) prepareExecutionState(ctx context.Context, storeUserID string, userID int64, lang, text string) (ExecutionState, error) {
existing := a.getExecutionState(userID)
if shouldResetExecutionStateForNewAttempt(text, existing) {
a.clearExecutionState(userID)
existing = ExecutionState{}
}
if existing.Status == executionStatusWaitingUser && existing.SessionID != "" {
a.refreshCurrentReferencesForUserText(storeUserID, text, &existing)
askedQuestion := latestAskedQuestion(existing)
replySummary := strings.TrimSpace(text)
if askedQuestion != "" {
replySummary = fmt.Sprintf("Answer to previous question [%s]: %s", askedQuestion, replySummary)
}
appendExecutionLog(&existing, Observation{
Kind: "user_reply",
Summary: replySummary,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
existing.Status = executionStatusPlanning
existing.Waiting = nil
existing.FinalAnswer = ""
existing.LastError = ""
existing = a.refreshStateForDynamicRequests(storeUserID, text, existing)
existing.Steps = completedSteps(existing.Steps)
existing.CurrentStepID = ""
existing.Status = executionStatusRunning
existing.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(existing); err != nil {
return ExecutionState{}, err
}
return existing, nil
}
state := newExecutionState(userID, text)
a.refreshCurrentReferencesForUserText(storeUserID, text, &state)
state = a.refreshStateForDynamicRequests(storeUserID, text, state)
state.Status = executionStatusRunning
if err := a.saveExecutionState(state); err != nil {
return ExecutionState{}, err
}
return state, nil
}
type nextStepDecision struct {
Goal string `json:"goal"`
Steps []PlanStep `json:"steps,omitempty"`
Step PlanStep `json:"step"`
}
func (a *Agent) decideNextStep(ctx context.Context, userID int64, lang string, state ExecutionState) (nextStepDecision, error) {
toolDefs, _ := json.Marshal(agentTools())
stateJSON, _ := json.Marshal(normalizeExecutionState(state))
obsJSON, _ := json.Marshal(buildObservationContext(state))
recentlyFetchedJSON, _ := json.Marshal(buildRecentlyFetchedData(state, time.Now().UTC()))
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
recentConversationCtx := a.buildRecentConversationContext(userID, state.Goal)
systemPrompt := `You are the step selector for NOFXi.
Return JSON only. Do not return markdown.
You are operating in ReAct mode: Thought -> Action -> Observation.
Choose the immediate next action batch. Do not generate a long multi-step execution plan.
Allowed step types:
- tool
- reason
- ask_user
- respond
Rules:
- Use all available memory layers: Execution state JSON, Observations JSON, Recent conversation, and Task state.
- Use Recently fetched data JSON as the deduplication source of truth for fresh tool results.
- Prefer the freshest evidence in this order: execution state, observations, recent conversation, then task state.
- If fresh external or system data is needed, choose a tool step.
- If the user is blocked on a missing parameter, choose ask_user.
- If there is enough information to answer now, choose respond.
- Use reason only when a short intermediate synthesis is necessary before the next action.
- Prefer tool or respond over reason whenever possible.
- Never emit the same reason step twice in a row.
- After a reason step, the next batch should usually be tool, ask_user, or respond. Do not stay in analysis loops.
- Never invent tools.
- If the task needs multiple independent tool reads, emit ALL of them together in one response.
- Parallelism rule: when multiple tool reads are mutually independent, do not split them across turns. Return them together in steps.
- Never mix ask_user/respond with additional steps in the same batch.
- Only emit multiple steps when every emitted step is a tool step.
- Avoid repeated tool calls. If a matching tool call already exists in Recently fetched data and age_seconds <= 60, do not call it again unless the user explicitly asks to refresh.
- For tool steps, set tool_name exactly to one available tool and provide tool_args as a JSON object.
- For ask_user or respond steps, put the user-facing question/response instruction in instruction.
- If the latest observation already answers the goal, prefer respond over another tool call.
- Never place a trade unless the user intent is explicit.
Return JSON with this exact shape:
{"goal":"","steps":[{"id":"step_1","type":"tool|reason|ask_user|respond","title":"","tool_name":"","tool_args":{},"instruction":"","requires_confirmation":false}]}`
userPrompt := fmt.Sprintf("Language: %s\nGoal: %s\n\nRecent conversation:\n%s\n\nAvailable tools JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s\n\nObservations JSON:\n%s\n\nRecently fetched data JSON:\n%s", lang, state.Goal, recentConversationCtx, string(toolDefs), a.buildPersistentPreferencesContext(userID), taskStateCtx, string(stateJSON), string(obsJSON), string(recentlyFetchedJSON))
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerCreateTimeout)
defer cancel()
startedAt := time.Now()
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
a.logPlannerTiming(state.SessionID, userID, "decide_next_step_llm", startedAt, err)
if err != nil {
return nextStepDecision{}, err
}
return parseNextStepDecisionJSON(raw)
}
func parseNextStepDecisionJSON(raw string) (nextStepDecision, error) {
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var decision nextStepDecision
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
return normalizeNextStepDecision(decision), nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start >= 0 && end > start {
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
return normalizeNextStepDecision(decision), nil
}
}
return nextStepDecision{}, fmt.Errorf("invalid next step decision json")
}
func normalizeNextStepDecision(decision nextStepDecision) nextStepDecision {
decision.Goal = strings.TrimSpace(decision.Goal)
steps := decision.Steps
if len(steps) == 0 && decision.Step.Type != "" {
steps = []PlanStep{decision.Step}
}
if len(steps) > 0 {
steps = normalizeExecutionState(ExecutionState{Steps: steps}).Steps
}
decision.Steps = steps
if len(steps) > 0 {
decision.Step = steps[0]
}
return decision
}
func (a *Agent) refreshStateForDynamicRequests(storeUserID, userText string, state ExecutionState) ExecutionState {
kinds := snapshotKindsForIntent(userText)
if len(kinds) == 0 {
return state
}
kindsToRefresh := make(map[string]struct{}, len(kinds))
for _, kind := range kinds {
kindsToRefresh[kind] = struct{}{}
}
fresh := make([]Observation, 0, len(state.DynamicSnapshots)+3)
for _, obs := range state.DynamicSnapshots {
if _, ok := kindsToRefresh[obs.Kind]; ok {
continue
}
fresh = append(fresh, obs)
}
appendSnapshot := func(kind, raw string) {
raw = strings.TrimSpace(raw)
if raw == "" {
return
}
fresh = append(fresh, Observation{
Kind: kind,
Summary: summarizeObservation(raw),
RawJSON: raw,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
}
for _, kind := range kinds {
switch kind {
case "current_model_configs":
appendSnapshot(kind, a.toolGetModelConfigs(storeUserID))
case "current_exchange_configs":
appendSnapshot(kind, a.toolGetExchangeConfigs(storeUserID))
case "current_traders":
appendSnapshot(kind, a.toolListTraders(storeUserID))
case "current_strategies":
appendSnapshot(kind, a.toolGetStrategies(storeUserID))
case "current_balances":
appendSnapshot(kind, a.toolGetBalance())
case "current_positions":
appendSnapshot(kind, a.toolGetPositions())
case "recent_trade_history":
appendSnapshot(kind, a.toolGetTradeHistory(`{"limit":10}`))
}
}
state.DynamicSnapshots = fresh
return state
}
func (a *Agent) buildRecentConversationContext(userID int64, currentUserText string) string {
if a.history == nil {
return ""
}
msgs := a.history.Get(userID)
if len(msgs) == 0 {
return ""
}
currentUserText = strings.TrimSpace(currentUserText)
if currentUserText != "" {
last := msgs[len(msgs)-1]
if last.Role == "user" && strings.TrimSpace(last.Content) == currentUserText {
msgs = msgs[:len(msgs)-1]
}
}
if len(msgs) == 0 {
return ""
}
if len(msgs) > recentConversationMessages {
msgs = msgs[len(msgs)-recentConversationMessages:]
}
transcript := formatChatMessagesForSummary(msgs)
if transcript == "" {
return ""
}
return transcript
}
func (a *Agent) createExecutionPlan(ctx context.Context, userID int64, lang, userText string, state ExecutionState) (executionPlan, error) {
toolDefs, _ := json.Marshal(agentTools())
stateJSON, _ := json.Marshal(normalizeExecutionState(state))
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
recentConversationCtx := a.buildRecentConversationContext(userID, userText)
if isConfigOrTraderIntent(userText) {
// Configuration and trader setup requests are especially sensitive to stale
// summaries like "this capability does not exist". Prefer fresh tool checks.
taskStateCtx = ""
}
systemPrompt := `You are the planning module for NOFXi.
Return JSON only. Do not return markdown.
Create a minimal safe execution plan using these step types only:
- tool
- reason
- ask_user
- respond
Rules:
- Use all available memory layers when planning: Execution state JSON, Recent conversation, and Task state.
- Memory priority order:
1. Execution state JSON = current operational truth for the active task.
2. Recent conversation = the best source for what was said in the last few turns.
3. Task state = compressed durable background only.
- If these memory layers conflict, prefer execution state first, then recent conversation. Do not let task state override fresher evidence.
- Do not ask the user to repeat a fact that is already explicit in execution state or recent conversation unless the inputs are contradictory.
- Use tool steps whenever fresh external data is required.
- Use ask_user if required parameters are missing.
- Never place a trade unless the user intent is explicit.
- For exchange binding or exchange credential requests, prefer get_exchange_configs/manage_exchange_config.
- For AI model binding or model credential requests, prefer get_model_configs/manage_model_config.
- For strategy template creation or editing requests, prefer get_strategies/manage_strategy.
- For trader creation or trader lifecycle requests, prefer manage_trader.
- A strategy template is independent and does not require exchange/model bindings unless the user explicitly asks to run or deploy it through a trader.
- If these tools exist, never answer that the system lacks exchange/model/trader management capability.
- When configuration, strategy, or trader creation is requested, gather missing required fields via ask_user, then call the appropriate tool.
- Before concluding that exchange/model/trader/strategy setup is impossible or missing, first inspect current state with the relevant tools.
- For high-volatility state such as balances, positions, recent trade history, or current config availability, prefer fresh tool reads over old observations.
- Keep the plan short and practical.
- End with either ask_user or respond.
- At most 8 steps.
- For tool steps, set tool_name exactly to one of the available tool names and provide tool_args as JSON object.
- For reason steps, put the reasoning task in instruction.
- For ask_user steps, put the exact follow-up question in instruction.
- For respond steps, put either a short instruction or leave instruction empty.
- If resuming after a waiting_user state, incorporate the new user reply and return a fresh full plan.
- Never invent tools.`
resumeContext := ""
if state.SessionID != "" {
if askedQuestion := latestAskedQuestion(state); askedQuestion != "" {
resumeContext = fmt.Sprintf("\n\nResume context:\n- The assistant was waiting for the user's answer to this exact question: %s\n- Interpret the new user message as the answer to that question unless the message clearly starts a new topic.", askedQuestion)
if state.Waiting != nil {
waitingJSON, _ := json.Marshal(state.Waiting)
resumeContext += fmt.Sprintf("\n- Structured waiting state JSON: %s", string(waitingJSON))
}
}
}
userPrompt := fmt.Sprintf("Language: %s\nUser request: %s%s\n\nRecent conversation:\n%s\n\nAvailable tools JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s\n\nReturn JSON with this exact shape:\n{\"goal\":\"\",\"steps\":[{\"id\":\"step_1\",\"type\":\"tool|reason|ask_user|respond\",\"title\":\"\",\"tool_name\":\"\",\"tool_args\":{},\"instruction\":\"\",\"requires_confirmation\":false}]}", lang, userText, resumeContext, recentConversationCtx, string(toolDefs), a.buildPersistentPreferencesContext(userID), taskStateCtx, string(stateJSON))
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerCreateTimeout)
defer cancel()
startedAt := time.Now()
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
a.logPlannerTiming(state.SessionID, userID, "create_execution_plan_llm", startedAt, err)
if err != nil {
return executionPlan{}, err
}
plan, err := parseExecutionPlanJSON(resp)
if err != nil {
return executionPlan{}, err
}
if len(plan.Steps) == 0 {
return executionPlan{}, fmt.Errorf("empty execution plan")
}
if len(plan.Steps) > plannerMaxSteps {
plan.Steps = plan.Steps[:plannerMaxSteps]
}
for i := range plan.Steps {
if plan.Steps[i].ID == "" {
plan.Steps[i].ID = fmt.Sprintf("step_%d", i+1)
}
if plan.Steps[i].Status == "" {
plan.Steps[i].Status = planStepStatusPending
}
if plan.Steps[i].Title == "" {
plan.Steps[i].Title = strings.ReplaceAll(plan.Steps[i].ID, "_", " ")
}
}
if strings.TrimSpace(plan.Goal) == "" {
plan.Goal = strings.TrimSpace(userText)
}
return plan, nil
}
func parseExecutionPlanJSON(raw string) (executionPlan, error) {
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var plan executionPlan
if err := json.Unmarshal([]byte(raw), &plan); err == nil {
return plan, nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start >= 0 && end > start {
if err := json.Unmarshal([]byte(raw[start:end+1]), &plan); err == nil {
return plan, nil
}
}
return executionPlan{}, fmt.Errorf("invalid execution plan json")
}
func (a *Agent) executePlan(ctx context.Context, storeUserID string, userID int64, lang string, state *ExecutionState, onEvent func(event, data string)) (string, error) {
if onEvent != nil && len(state.Steps) > 0 {
onEvent(StreamEventPlan, formatPlanStatus(*state, lang))
}
for i := 0; i < plannerMaxIterations; i++ {
stepIndex := nextPendingStepIndex(state.Steps)
if stepIndex < 0 {
decisionStartedAt := time.Now()
decision, err := a.decideNextStep(ctx, userID, lang, *state)
a.logPlannerTiming(state.SessionID, userID, "decide_next_step", decisionStartedAt, err)
if err != nil {
return "", err
}
steps := filterFreshDuplicateToolSteps(decision.Steps, *state, time.Now().UTC())
if len(steps) == 0 {
appendExecutionLog(state, Observation{
Kind: "decision_note",
Summary: "Skipped duplicate fresh tool calls from next-step decision",
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
continue
}
if hasRepeatedReasonLoop(*state, steps) {
return "", fmt.Errorf("repeated reasoning loop detected")
}
if decision.Goal != "" {
state.Goal = decision.Goal
}
base := len(completedSteps(state.Steps))
for idx := range steps {
if steps[idx].Type == "" {
return "", fmt.Errorf("next step decision missing step type")
}
if steps[idx].ID == "" {
steps[idx].ID = fmt.Sprintf("step_%d", base+idx+1)
}
if steps[idx].Title == "" {
steps[idx].Title = strings.ReplaceAll(steps[idx].ID, "_", " ")
}
if steps[idx].Status == "" {
steps[idx].Status = planStepStatusPending
}
}
state.Steps = append(completedSteps(state.Steps), steps...)
state.Status = executionStatusRunning
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
if onEvent != nil {
onEvent(StreamEventPlan, formatPlanStatus(*state, lang))
}
continue
}
step := &state.Steps[stepIndex]
step.Status = planStepStatusRunning
state.Status = executionStatusRunning
state.CurrentStepID = step.ID
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if onEvent != nil {
onEvent(StreamEventStepStart, formatStepStatus(*step, stepIndex, len(state.Steps), lang))
}
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
switch step.Type {
case planStepTypeTool:
if onEvent != nil {
onEvent(StreamEventTool, step.ToolName)
}
stepStartedAt := time.Now()
result := a.executePlanTool(ctx, storeUserID, userID, lang, *step)
a.logPlannerTiming(state.SessionID, userID, "tool:"+step.ToolName, stepStartedAt, nil)
summary := summarizeObservation(result)
referencesChanged := false
step.Status = planStepStatusCompleted
step.OutputSummary = summary
appendExecutionLog(state, Observation{
StepID: step.ID,
Kind: "tool_result",
Summary: summary,
RawJSON: result,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
referencesChanged = updateCurrentReferencesFromToolResult(state, step.ToolName, result)
_ = referencesChanged
case planStepTypeReason:
reasonStartedAt := time.Now()
reasoning, err := a.executeReasonStep(ctx, userID, lang, state.Goal, *state, *step)
a.logPlannerTiming(state.SessionID, userID, "reason_step", reasonStartedAt, err)
if err != nil {
step.Status = planStepStatusFailed
step.Error = err.Error()
state.Status = executionStatusFailed
state.LastError = err.Error()
_ = a.saveExecutionState(*state)
return "", err
}
step.Status = planStepStatusCompleted
step.OutputSummary = reasoning
appendExecutionLog(state, Observation{
StepID: step.ID,
Kind: "reasoning",
Summary: reasoning,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
case planStepTypeAskUser:
question := strings.TrimSpace(step.Instruction)
if question == "" {
if lang == "zh" {
question = "我还缺少一些信息,麻烦你补充一下。"
} else {
question = "I need a bit more information before I continue."
}
}
step.Status = planStepStatusCompleted
step.OutputSummary = question
state.Status = executionStatusWaitingUser
state.Waiting = buildWaitingState(*state, *step, question)
state.FinalAnswer = question
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
if onEvent != nil {
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
onEvent(StreamEventDelta, question)
}
return question, nil
case planStepTypeRespond:
respondStartedAt := time.Now()
finalText, err := a.generateFinalPlanResponse(ctx, userID, lang, *state, step.Instruction)
a.logPlannerTiming(state.SessionID, userID, "respond_step", respondStartedAt, err)
if err != nil {
return "", err
}
step.Status = planStepStatusCompleted
step.OutputSummary = finalText
state.Status = executionStatusCompleted
state.Waiting = nil
state.FinalAnswer = finalText
state.CurrentStepID = ""
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
if onEvent != nil {
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
onEvent(StreamEventDelta, finalText)
}
return finalText, nil
default:
return "", fmt.Errorf("unsupported step type: %s", step.Type)
}
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
if err := a.saveExecutionState(*state); err != nil {
return "", err
}
if onEvent != nil {
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
}
}
return "", fmt.Errorf("plan execution exceeded iteration limit")
}
type fetchedToolRecord struct {
ToolName string `json:"tool_name"`
ToolArgsJSON string `json:"tool_args_json"`
FetchedAt string `json:"fetched_at"`
AgeSeconds int64 `json:"age_seconds"`
}
func buildRecentlyFetchedData(state ExecutionState, now time.Time) []fetchedToolRecord {
state = normalizeExecutionState(state)
stepByID := make(map[string]PlanStep, len(state.Steps))
for _, step := range state.Steps {
stepByID[step.ID] = step
}
latest := map[string]fetchedToolRecord{}
for _, obs := range state.ExecutionLog {
if obs.Kind != "tool_result" {
continue
}
step, ok := stepByID[obs.StepID]
if !ok || step.ToolName == "" {
continue
}
sig := toolCallSignature(step.ToolName, step.ToolArgs)
createdAt := parseRFC3339(obs.CreatedAt)
record := fetchedToolRecord{
ToolName: step.ToolName,
ToolArgsJSON: toolArgsJSONString(step.ToolArgs),
FetchedAt: obs.CreatedAt,
AgeSeconds: int64(now.Sub(createdAt).Seconds()),
}
prev, exists := latest[sig]
if !exists || prev.FetchedAt < record.FetchedAt {
latest[sig] = record
}
}
out := make([]fetchedToolRecord, 0, len(latest))
for _, record := range latest {
if record.AgeSeconds < 0 {
record.AgeSeconds = 0
}
out = append(out, record)
}
return out
}
func filterFreshDuplicateToolSteps(steps []PlanStep, state ExecutionState, now time.Time) []PlanStep {
if len(steps) == 0 {
return nil
}
fresh := make(map[string]struct{})
for _, item := range buildRecentlyFetchedData(state, now) {
if item.AgeSeconds <= 60 {
fresh[item.ToolName+"|"+item.ToolArgsJSON] = struct{}{}
}
}
out := make([]PlanStep, 0, len(steps))
for _, step := range steps {
if step.Type != planStepTypeTool {
out = append(out, step)
continue
}
sig := toolCallSignature(step.ToolName, step.ToolArgs)
if _, ok := fresh[sig]; ok {
continue
}
fresh[sig] = struct{}{}
out = append(out, step)
}
return out
}
func hasRepeatedReasonLoop(state ExecutionState, steps []PlanStep) bool {
if len(steps) == 0 {
return false
}
last := lastCompletedStep(state.Steps)
if last == nil || last.Type != planStepTypeReason {
return false
}
for _, step := range steps {
if step.Type != planStepTypeReason {
return false
}
if stepSemanticKey(*last) != stepSemanticKey(step) {
return false
}
}
return true
}
func lastCompletedStep(steps []PlanStep) *PlanStep {
for i := len(steps) - 1; i >= 0; i-- {
if steps[i].Status == planStepStatusCompleted {
return &steps[i]
}
}
return nil
}
func stepSemanticKey(step PlanStep) string {
return strings.ToLower(strings.TrimSpace(
step.Type + "|" + step.ToolName + "|" + step.Title + "|" + step.Instruction,
))
}
func toolCallSignature(toolName string, args map[string]any) string {
return strings.TrimSpace(toolName) + "|" + toolArgsJSONString(args)
}
func toolArgsJSONString(args map[string]any) string {
if len(args) == 0 {
return "{}"
}
data, err := json.Marshal(args)
if err != nil {
return "{}"
}
return string(data)
}
func parseRFC3339(value string) time.Time {
t, err := time.Parse(time.RFC3339, strings.TrimSpace(value))
if err != nil {
return time.Time{}
}
return t
}
func (a *Agent) replanAfterStep(ctx context.Context, userID int64, lang string, state ExecutionState, completedStep PlanStep) (replannerDecision, error) {
obsJSON, _ := json.Marshal(buildObservationContext(state))
stepsJSON, _ := json.Marshal(state.Steps)
systemPrompt := `You are the replanning module for NOFXi.
Return JSON only.
Decide what to do after a plan step completed.
Allowed actions:
- continue
- replace_remaining
- ask_user
- finish
Rules:
- Use continue when the current remaining steps still make sense.
- Use replace_remaining when the observations materially change the remaining plan.
- Use ask_user when execution is blocked on missing user input.
- Use finish when there is enough information to answer and remaining steps are unnecessary.
- If action=replace_remaining, return a fresh list of remaining steps only.
- Keep plans short and safe.
- Never invent tools.`
userPrompt := fmt.Sprintf("Language: %s\nGoal: %s\nCompleted step: %s (%s)\nCompleted summary: %s\n\nCurrent steps JSON:\n%s\n\nObservations JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nReturn JSON with this exact shape:\n{\"action\":\"continue|replace_remaining|ask_user|finish\",\"goal\":\"\",\"instruction\":\"\",\"question\":\"\",\"steps\":[{\"id\":\"step_x\",\"type\":\"tool|reason|ask_user|respond\",\"title\":\"\",\"tool_name\":\"\",\"tool_args\":{},\"instruction\":\"\",\"requires_confirmation\":false}]}", lang, state.Goal, completedStep.ID, completedStep.Type, completedStep.OutputSummary, string(stepsJSON), string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerReplanTimeout)
defer cancel()
startedAt := time.Now()
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
MaxTokens: intPtr(500),
})
a.logPlannerTiming(state.SessionID, userID, "replan_after_step_llm", startedAt, err)
if err != nil {
return replannerDecision{}, err
}
return parseReplannerDecisionJSON(raw)
}
func parseReplannerDecisionJSON(raw string) (replannerDecision, error) {
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var decision replannerDecision
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
return normalizeReplannerDecision(decision), nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start >= 0 && end > start {
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
return normalizeReplannerDecision(decision), nil
}
}
return replannerDecision{}, fmt.Errorf("invalid replanner decision json")
}
func normalizeReplannerDecision(decision replannerDecision) replannerDecision {
decision.Action = strings.TrimSpace(decision.Action)
decision.Goal = strings.TrimSpace(decision.Goal)
decision.Instruction = strings.TrimSpace(decision.Instruction)
decision.Question = strings.TrimSpace(decision.Question)
for i := range decision.Steps {
if decision.Steps[i].ID == "" {
decision.Steps[i].ID = fmt.Sprintf("step_%d", i+1)
}
if decision.Steps[i].Status == "" {
decision.Steps[i].Status = planStepStatusPending
}
decision.Steps[i].Type = strings.TrimSpace(decision.Steps[i].Type)
decision.Steps[i].Title = strings.TrimSpace(decision.Steps[i].Title)
decision.Steps[i].ToolName = strings.TrimSpace(decision.Steps[i].ToolName)
decision.Steps[i].Instruction = strings.TrimSpace(decision.Steps[i].Instruction)
}
return decision
}
func applyReplannerDecision(state *ExecutionState, decision replannerDecision) bool {
switch decision.Action {
case "", "continue":
return false
case "finish":
state.Steps = append(completedSteps(state.Steps), PlanStep{
ID: fmt.Sprintf("step_finish_%d", time.Now().UTC().UnixNano()),
Type: planStepTypeRespond,
Title: "final response",
Status: planStepStatusPending,
Instruction: decision.Instruction,
})
state.CurrentStepID = ""
if decision.Goal != "" {
state.Goal = decision.Goal
}
state.Waiting = nil
return true
case "ask_user":
question := decision.Question
if question == "" {
question = decision.Instruction
}
state.Steps = append(completedSteps(state.Steps), PlanStep{
ID: fmt.Sprintf("step_ask_%d", time.Now().UTC().UnixNano()),
Type: planStepTypeAskUser,
Title: "need user input",
Status: planStepStatusPending,
Instruction: question,
})
state.CurrentStepID = ""
if decision.Goal != "" {
state.Goal = decision.Goal
}
state.Waiting = buildWaitingState(*state, state.Steps[len(state.Steps)-1], question)
return true
case "replace_remaining":
if len(decision.Steps) == 0 {
return false
}
state.Steps = append(completedSteps(state.Steps), decision.Steps...)
state.CurrentStepID = ""
if decision.Goal != "" {
state.Goal = decision.Goal
}
state.Waiting = nil
return true
default:
return false
}
}
func shouldAttemptReplan(state ExecutionState, step PlanStep, referencesChanged bool) bool {
if step.Type != planStepTypeTool {
return false
}
if toolResultIndicatesError(step.OutputSummary) || toolResultSignalsDependencyGap(step.OutputSummary) {
return true
}
if referencesChanged {
return true
}
if !hasPendingWorkAfterStep(state.Steps) {
return false
}
switch step.ToolName {
case "manage_trader", "manage_strategy", "manage_model_config", "manage_exchange_config", "execute_trade":
return toolActionMayChangePlan(step.ToolArgs)
default:
return false
}
}
func hasPendingWorkAfterStep(steps []PlanStep) bool {
for _, step := range steps {
if step.Status == planStepStatusPending {
return true
}
}
return false
}
func toolActionMayChangePlan(args map[string]any) bool {
action, _ := args["action"].(string)
switch strings.TrimSpace(action) {
case "create", "update", "delete", "start", "stop", "activate", "duplicate":
return true
default:
return false
}
}
func toolResultIndicatesError(summary string) bool {
lower := strings.ToLower(strings.TrimSpace(summary))
return strings.Contains(lower, `"error"`) || strings.Contains(lower, `"status":"error"`) || strings.Contains(lower, "failed to ")
}
func toolResultSignalsDependencyGap(summary string) bool {
lower := strings.ToLower(strings.TrimSpace(summary))
patterns := []string{
"is required", "invalid ai_model_id", "invalid exchange_id", "invalid strategy_id",
"ai model is disabled", "exchange is disabled", "not found", "missing",
}
return containsAnyKeyword(lower, patterns)
}
func completedSteps(steps []PlanStep) []PlanStep {
out := make([]PlanStep, 0, len(steps))
for _, step := range steps {
if step.Status == planStepStatusCompleted {
out = append(out, step)
}
}
return out
}
func (a *Agent) planningStatusText(lang string) string {
if lang == "zh" {
return "🧭 正在规划执行步骤..."
}
return "🧭 Planning the next execution steps..."
}
func formatPlanStatus(state ExecutionState, lang string) string {
parts := make([]string, 0, len(state.Steps))
for i, step := range state.Steps {
label := step.Title
if label == "" {
label = step.Type
}
parts = append(parts, fmt.Sprintf("%d.%s", i+1, label))
}
if lang == "zh" {
return fmt.Sprintf("🗺️ 计划: %s", strings.Join(parts, " -> "))
}
return fmt.Sprintf("🗺️ Plan: %s", strings.Join(parts, " -> "))
}
func formatStepStatus(step PlanStep, idx, total int, lang string) string {
label := step.Title
if label == "" {
label = step.Type
}
if lang == "zh" {
return fmt.Sprintf("▶️ 步骤 %d/%d: %s", idx+1, total, label)
}
return fmt.Sprintf("▶️ Step %d/%d: %s", idx+1, total, label)
}
func formatStepCompleteStatus(step PlanStep, lang string) string {
label := step.Title
if label == "" {
label = step.Type
}
if lang == "zh" {
return fmt.Sprintf("✅ 已完成: %s", label)
}
return fmt.Sprintf("✅ Completed: %s", label)
}
func formatReplanStatus(decision replannerDecision, lang string) string {
switch decision.Action {
case "replace_remaining":
if lang == "zh" {
return "🔄 已根据新结果更新后续步骤"
}
return "🔄 Updated the remaining steps based on new results"
case "ask_user":
if lang == "zh" {
return "📝 当前流程需要用户补充信息"
}
return "📝 This flow needs more user input"
case "finish":
if lang == "zh" {
return "🏁 已提前收敛到最终回复"
}
return "🏁 Converged early to the final response"
default:
if lang == "zh" {
return "🔄 已重新评估计划"
}
return "🔄 Re-evaluated the plan"
}
}
func (a *Agent) executePlanTool(ctx context.Context, storeUserID string, userID int64, lang string, step PlanStep) string {
argsJSON := "{}"
if len(step.ToolArgs) > 0 {
if data, err := json.Marshal(step.ToolArgs); err == nil {
argsJSON = string(data)
}
}
return a.handleToolCall(ctx, storeUserID, userID, lang, mcp.ToolCall{
ID: step.ID,
Type: "function",
Function: mcp.ToolCallFunction{
Name: step.ToolName,
Arguments: argsJSON,
},
})
}
func (a *Agent) executeReasonStep(ctx context.Context, userID int64, lang, goal string, state ExecutionState, step PlanStep) (string, error) {
obsJSON, _ := json.Marshal(buildObservationContext(state))
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerReasonTimeout)
defer cancel()
startedAt := time.Now()
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage("You are the reasoning module for NOFXi. Return one short paragraph only. No markdown, no bullet list."),
mcp.NewUserMessage(fmt.Sprintf("Language: %s\nGoal: %s\nReasoning task: %s\nObservations JSON: %s\nPersistent preferences: %s\nTask state: %s", lang, goal, step.Instruction, string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))),
},
Ctx: stageCtx,
})
a.logPlannerTiming(state.SessionID, userID, "reason_step_llm", startedAt, err)
if err != nil {
return "", err
}
return summarizeObservation(resp), nil
}
func (a *Agent) generateFinalPlanResponse(ctx context.Context, userID int64, lang string, state ExecutionState, instruction string) (string, error) {
obsJSON, _ := json.Marshal(buildObservationContext(state))
systemPrompt := a.buildSystemPrompt(lang)
if instruction == "" {
instruction = "Provide the best possible final response to the user based on the finished execution."
}
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerFinalTimeout)
defer cancel()
startedAt := time.Now()
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewSystemMessage("You are responding after a completed execution plan. Use the observations as the source of truth. Be concise and actionable."),
mcp.NewUserMessage(fmt.Sprintf("Goal: %s\nResponse instruction: %s\nObservations JSON: %s\nPersistent preferences: %s\nTask state: %s", state.Goal, instruction, string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))),
},
Ctx: stageCtx,
})
a.logPlannerTiming(state.SessionID, userID, "generate_final_response_llm", startedAt, err)
return resp, err
}
func (a *Agent) logPlannerTiming(sessionID string, userID int64, stage string, startedAt time.Time, err error) {
if stage == "" || startedAt.IsZero() {
return
}
attrs := []any{
"session_id", sessionID,
"user_id", userID,
"stage", stage,
"elapsed_ms", time.Since(startedAt).Milliseconds(),
}
if err != nil {
attrs = append(attrs, "error", err.Error())
}
a.log().Info("planner timing", attrs...)
}
func nextPendingStepIndex(steps []PlanStep) int {
for i := range steps {
if steps[i].Status == "" || steps[i].Status == planStepStatusPending {
return i
}
}
return -1
}
func summarizeObservation(value string) string {
value = strings.TrimSpace(value)
if len(value) <= observationMaxLength {
return value
}
return strings.TrimSpace(value[:observationMaxLength]) + "..."
}
func (a *Agent) thinkAndActLegacy(ctx context.Context, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
systemPrompt := a.buildSystemPrompt(lang)
enrichment := a.gatherContext(text)
preferencesCtx := a.buildPersistentPreferencesContext(userID)
userPrompt := text
if preferencesCtx != "" {
userPrompt = preferencesCtx + "\n\n---\n" + userPrompt
}
if enrichment != "" {
userPrompt = text + "\n\n---\n[NOFXi System Context - real-time data for reference]\n" + enrichment
if preferencesCtx != "" {
userPrompt = preferencesCtx + "\n\n---\n" + userPrompt
}
}
messages := []mcp.Message{mcp.NewSystemMessage(systemPrompt)}
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
if isConfigOrTraderIntent(text) {
taskStateCtx = ""
}
if taskStateCtx != "" {
messages = append(messages, mcp.NewSystemMessage(taskStateCtx))
}
history := a.history.Get(userID)
if len(history) > 0 {
history = history[:len(history)-1]
}
for _, msg := range history {
messages = append(messages, mcp.NewMessage(msg.Role, msg.Content))
}
messages = append(messages, mcp.NewUserMessage(userPrompt))
tools := agentTools()
const maxToolRounds = 5
for round := 0; round < maxToolRounds; round++ {
req := &mcp.Request{
Messages: messages,
Tools: tools,
ToolChoice: "auto",
Ctx: ctx,
}
resp, err := a.aiClient.CallWithRequestFull(req)
if err != nil {
if round == 0 {
plainResp, plainErr := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx})
if plainErr != nil {
a.logger.Warn("legacy AI plain fallback failed", "error", plainErr, "user_id", userID)
return a.aiServiceFailure(lang, plainErr)
}
if onEvent != nil {
onEvent(StreamEventDelta, plainResp)
}
return plainResp, nil
}
a.logger.Warn("legacy AI tool round failed", "error", err, "user_id", userID, "round", round)
return a.aiServiceFailure(lang, err)
}
if len(resp.ToolCalls) == 0 {
if onEvent != nil {
onEvent(StreamEventDelta, resp.Content)
}
return resp.Content, nil
}
assistantMsg := mcp.Message{Role: "assistant", ToolCalls: resp.ToolCalls}
if resp.Content != "" {
assistantMsg.Content = resp.Content
}
messages = append(messages, assistantMsg)
for _, tc := range resp.ToolCalls {
if onEvent != nil {
onEvent(StreamEventTool, tc.Function.Name)
}
result := a.handleToolCall(ctx, storeUserIDFromContext(ctx), userID, lang, tc)
messages = append(messages, mcp.Message{
Role: "tool",
Content: result,
ToolCallID: tc.ID,
})
}
}
finalResp, err := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx})
if err != nil {
a.logger.Warn("legacy AI final response failed", "error", err, "user_id", userID)
return a.aiServiceFailure(lang, err)
}
if onEvent != nil {
onEvent(StreamEventDelta, finalResp)
}
return finalResp, nil
}