From 785922697b6df457ca54709b51197eee6fcc990c Mon Sep 17 00:00:00 2001 From: tinkle-community Date: Thu, 11 Jun 2026 01:00:41 +0800 Subject: [PATCH] feat(agent): add native function-calling agentic loop as the new brain core One standard tool-use loop replaces the need for layered JSON routing: the LLM sees all 22 tools plus real multi-turn history, every tool result (including errors) returns to the loop as an observation, and the final user-facing reply is always LLM-written. Interruptions report exactly which tools already executed so side effects are never silently lost or repeated by fallback paths. Gated by NOFX_AGENT_V2 (default on). --- agent/agentic_loop.go | 193 ++++++++++++++++++++++++++ agent/agentic_loop_test.go | 275 +++++++++++++++++++++++++++++++++++++ 2 files changed, 468 insertions(+) create mode 100644 agent/agentic_loop.go create mode 100644 agent/agentic_loop_test.go diff --git a/agent/agentic_loop.go b/agent/agentic_loop.go new file mode 100644 index 00000000..3522c044 --- /dev/null +++ b/agent/agentic_loop.go @@ -0,0 +1,193 @@ +package agent + +import ( + "context" + "fmt" + "os" + "strings" + + "nofx/mcp" +) + +const ( + // agenticMaxToolRounds bounds the number of LLM round-trips in one user + // turn. Each round may execute several tool calls, so this comfortably + // covers chained operations (create → configure → start) while still + // terminating runaway loops. + agenticMaxToolRounds = 12 + + // agenticHistoryMessages is the number of recent history messages replayed + // to the LLM as real conversation turns. + agenticHistoryMessages = 12 +) + +// agentV2Enabled reports whether the native function-calling loop is the +// primary brain. Enabled by default; set NOFX_AGENT_V2=0/false/off/disabled +// to fall back to the legacy routing stack. +func agentV2Enabled() bool { + switch strings.TrimSpace(strings.ToLower(os.Getenv("NOFX_AGENT_V2"))) { + case "0", "false", "off", "disabled": + return false + } + return true +} + +// runAgenticTurn drives one user turn through a native function-calling loop: +// the LLM sees the full toolset plus recent conversation, decides which tools +// to call, receives every tool result (including errors) as observations, and +// writes the final user-facing reply itself. +// +// Returns handled=false when nothing user-visible happened and the caller +// should fall back to the legacy routing stack (e.g. the very first LLM call +// failed). Once any tool has executed, the turn is always handled so side +// effects are never silently repeated by a fallback path. +func (a *Agent) runAgenticTurn(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) { + if a.aiClient == nil { + return "", false, nil + } + + messages := []mcp.Message{mcp.NewSystemMessage(a.buildSystemPromptForStoreUser(lang, storeUserID))} + if prefs := a.buildPersistentPreferencesContext(userID); prefs != "" { + messages = append(messages, mcp.NewSystemMessage(prefs)) + } + if taskCtx := buildTaskStateContext(a.getTaskState(userID)); taskCtx != "" { + messages = append(messages, mcp.NewSystemMessage(taskCtx)) + } + messages = append(messages, a.recentHistoryMessages(userID, text)...) + messages = append(messages, mcp.NewUserMessage(text)) + + tools := agentTools() + var executedTools []string + + for round := 0; round < agenticMaxToolRounds; round++ { + resp, err := a.aiClient.CallWithRequestFull(&mcp.Request{ + Messages: messages, + Tools: tools, + ToolChoice: "auto", + Ctx: ctx, + }) + if err != nil { + a.logger.Warn("agentic turn LLM call failed", "error", err, "user_id", userID, "round", round) + if len(executedTools) == 0 { + // Nothing happened yet — safe to let the legacy stack retry. + return "", false, nil + } + reply := agenticInterruptedReply(lang, executedTools) + return a.finishAgenticTurn(userID, lang, text, reply, onEvent), true, nil + } + + if len(resp.ToolCalls) == 0 { + reply := strings.TrimSpace(resp.Content) + if reply == "" { + if len(executedTools) == 0 { + return "", false, nil + } + reply = agenticInterruptedReply(lang, executedTools) + } + return a.finishAgenticTurn(userID, lang, text, reply, onEvent), true, nil + } + + assistantMsg := mcp.Message{Role: "assistant", ToolCalls: resp.ToolCalls} + if resp.Content != "" { + assistantMsg.Content = resp.Content + } + if resp.ReasoningContent != "" { + assistantMsg.ReasoningContent = resp.ReasoningContent + } + messages = append(messages, assistantMsg) + + for _, tc := range resp.ToolCalls { + if onEvent != nil { + onEvent(StreamEventTool, tc.Function.Name) + } + executedTools = append(executedTools, tc.Function.Name) + result := a.handleToolCall(ctx, storeUserID, userID, lang, tc) + messages = append(messages, mcp.Message{ + Role: "tool", + Content: result, + ToolCallID: tc.ID, + }) + } + } + + // Round budget exhausted: ask the LLM to wrap up with what it has, without + // offering further tools. + messages = append(messages, mcp.NewSystemMessage(agenticWrapUpInstruction(lang))) + final, err := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx}) + if err != nil || strings.TrimSpace(final) == "" { + if err != nil { + a.logger.Warn("agentic wrap-up call failed", "error", err, "user_id", userID) + } + final = agenticInterruptedReply(lang, executedTools) + } + return a.finishAgenticTurn(userID, lang, text, final, onEvent), true, nil +} + +// finishAgenticTurn applies final-reply guards, records the turn in history, +// and streams the reply. +func (a *Agent) finishAgenticTurn(userID int64, lang, text, reply string, onEvent func(event, data string)) string { + if guarded, blocked := guardUnsupportedAsyncPromise(lang, reply); blocked { + reply = guarded + } + if a.history != nil { + a.history.Add(userID, "user", text) + a.history.Add(userID, "assistant", reply) + } + emitStreamText(onEvent, reply) + return reply +} + +// recentHistoryMessages replays recent conversation turns as real chat +// messages so the LLM has multi-turn context, dropping a trailing duplicate of +// the current user text if the caller already recorded it. +func (a *Agent) recentHistoryMessages(userID int64, currentText string) []mcp.Message { + if a.history == nil { + return nil + } + msgs := a.history.Get(userID) + if n := len(msgs); n > 0 && msgs[n-1].Role == "user" && + strings.TrimSpace(msgs[n-1].Content) == strings.TrimSpace(currentText) { + msgs = msgs[:n-1] + } + if len(msgs) > agenticHistoryMessages { + msgs = msgs[len(msgs)-agenticHistoryMessages:] + } + out := make([]mcp.Message, 0, len(msgs)) + for _, m := range msgs { + content := strings.TrimSpace(m.Content) + if content == "" { + continue + } + switch m.Role { + case "user": + out = append(out, mcp.NewUserMessage(content)) + case "assistant": + out = append(out, mcp.Message{Role: "assistant", Content: content}) + } + } + return out +} + +// agenticInterruptedReply tells the user exactly which tools already ran when +// a turn cannot produce an LLM-written reply, so work is never silently lost. +func agenticInterruptedReply(lang string, executedTools []string) string { + tools := strings.Join(executedTools, ", ") + if lang == "zh" { + if tools == "" { + return "刚才处理你的请求时 AI 服务中断了,已执行的操作没有丢失。请再说一次你想做什么,我接着处理。" + } + return fmt.Sprintf("处理过程中 AI 服务中断了。已执行的操作:%s。这些结果已生效,你可以让我继续下一步或查询当前状态。", tools) + } + if tools == "" { + return "The AI service was interrupted while handling your request. Nothing was lost — please tell me again what you'd like to do." + } + return fmt.Sprintf("The AI service was interrupted mid-task. Tools already executed: %s. Those results took effect — ask me to continue or check the current state.", tools) +} + +// agenticWrapUpInstruction is appended when the tool-round budget is spent. +func agenticWrapUpInstruction(lang string) string { + if lang == "zh" { + return "工具调用轮次已达上限。请基于以上已获得的全部结果,直接给用户一个完整的中文总结回复:说明已完成什么、未完成什么、建议的下一步。不要再请求调用工具。" + } + return "Tool-call round limit reached. Using everything gathered above, write the final reply for the user now: what was completed, what was not, and the suggested next step. Do not request more tool calls." +} diff --git a/agent/agentic_loop_test.go b/agent/agentic_loop_test.go new file mode 100644 index 00000000..4a5a4eac --- /dev/null +++ b/agent/agentic_loop_test.go @@ -0,0 +1,275 @@ +package agent + +import ( + "context" + "errors" + "log/slog" + "strings" + "testing" + "time" + + "nofx/mcp" +) + +// scriptedAIClient returns queued LLMResponses (or errors) in order for +// CallWithRequestFull, and queued plain strings for CallWithRequest. +type scriptedAIClient struct { + fullResponses []*mcp.LLMResponse + fullErrs []error + fullCalls int + fullRequests []*mcp.Request + + plainResponse string + plainErr error + plainCalls int +} + +func (c *scriptedAIClient) SetAPIKey(apiKey string, customURL string, customModel string) {} +func (c *scriptedAIClient) SetTimeout(timeout time.Duration) {} +func (c *scriptedAIClient) CallWithMessages(systemPrompt, userPrompt string) (string, error) { + return c.plainResponse, c.plainErr +} +func (c *scriptedAIClient) CallWithRequest(req *mcp.Request) (string, error) { + c.plainCalls++ + return c.plainResponse, c.plainErr +} +func (c *scriptedAIClient) CallWithRequestStream(req *mcp.Request, onChunk func(string)) (string, error) { + if onChunk != nil && c.plainErr == nil { + onChunk(c.plainResponse) + } + return c.plainResponse, c.plainErr +} +func (c *scriptedAIClient) CallWithRequestFull(req *mcp.Request) (*mcp.LLMResponse, error) { + idx := c.fullCalls + c.fullCalls++ + c.fullRequests = append(c.fullRequests, req) + var err error + if idx < len(c.fullErrs) { + err = c.fullErrs[idx] + } + if err != nil { + return nil, err + } + if idx < len(c.fullResponses) { + return c.fullResponses[idx], nil + } + return &mcp.LLMResponse{Content: "fallthrough"}, nil +} + +func newAgenticTestAgent(client mcp.AIClient) *Agent { + a := New(nil, nil, DefaultConfig(), slog.Default()) + a.SetAIClient(client) + return a +} + +func toolCall(id, name, args string) mcp.ToolCall { + return mcp.ToolCall{ + ID: id, + Type: "function", + Function: mcp.ToolCallFunction{ + Name: name, + Arguments: args, + }, + } +} + +func TestRunAgenticTurnDirectAnswer(t *testing.T) { + client := &scriptedAIClient{ + fullResponses: []*mcp.LLMResponse{{Content: "你好,我是 NOFX 助手。"}}, + } + a := newAgenticTestAgent(client) + + answer, handled, err := a.runAgenticTurn(context.Background(), "default", 1, "zh", "你好", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !handled { + t.Fatal("expected turn to be handled") + } + if answer != "你好,我是 NOFX 助手。" { + t.Fatalf("answer = %q", answer) + } + if client.fullCalls != 1 { + t.Fatalf("fullCalls = %d, want 1", client.fullCalls) + } + // Tools must be offered on the request. + if len(client.fullRequests[0].Tools) == 0 { + t.Fatal("expected tools to be attached to the LLM request") + } + // Conversation must be recorded in history. + msgs := a.history.Get(1) + if len(msgs) != 2 || msgs[0].Role != "user" || msgs[1].Role != "assistant" { + t.Fatalf("history = %+v, want user+assistant turns", msgs) + } +} + +func TestRunAgenticTurnToolRoundTrip(t *testing.T) { + client := &scriptedAIClient{ + fullResponses: []*mcp.LLMResponse{ + {ToolCalls: []mcp.ToolCall{toolCall("call_1", "definitely_not_a_tool", "{}")}}, + {Content: "done"}, + }, + } + a := newAgenticTestAgent(client) + + var toolEvents []string + onEvent := func(event, data string) { + if event == StreamEventTool { + toolEvents = append(toolEvents, data) + } + } + + answer, handled, err := a.runAgenticTurn(context.Background(), "default", 2, "en", "do something", onEvent) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !handled || answer != "done" { + t.Fatalf("handled=%v answer=%q", handled, answer) + } + if client.fullCalls != 2 { + t.Fatalf("fullCalls = %d, want 2", client.fullCalls) + } + if len(toolEvents) != 1 || toolEvents[0] != "definitely_not_a_tool" { + t.Fatalf("toolEvents = %v", toolEvents) + } + + // The second request must carry the assistant tool-call message and the + // tool result message with matching ToolCallID. + second := client.fullRequests[1] + var sawAssistantToolCall, sawToolResult bool + for _, m := range second.Messages { + if m.Role == "assistant" && len(m.ToolCalls) == 1 && m.ToolCalls[0].ID == "call_1" { + sawAssistantToolCall = true + } + if m.Role == "tool" && m.ToolCallID == "call_1" { + sawToolResult = true + if !strings.Contains(m.Content, "unknown tool") { + t.Fatalf("tool result = %q, want unknown-tool error payload", m.Content) + } + } + } + if !sawAssistantToolCall || !sawToolResult { + t.Fatalf("tool round-trip messages missing: assistant=%v tool=%v", sawAssistantToolCall, sawToolResult) + } +} + +func TestRunAgenticTurnFirstCallFailureFallsBack(t *testing.T) { + client := &scriptedAIClient{ + fullErrs: []error{errors.New("upstream 500")}, + } + a := newAgenticTestAgent(client) + + _, handled, err := a.runAgenticTurn(context.Background(), "default", 3, "zh", "hi", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if handled { + t.Fatal("expected fallback (handled=false) when the first LLM call fails") + } + if got := a.history.Get(3); len(got) != 0 { + t.Fatalf("history should stay empty on fallback, got %+v", got) + } +} + +func TestRunAgenticTurnMidLoopFailureReportsExecutedTools(t *testing.T) { + client := &scriptedAIClient{ + fullResponses: []*mcp.LLMResponse{ + {ToolCalls: []mcp.ToolCall{toolCall("call_1", "definitely_not_a_tool", "{}")}}, + }, + fullErrs: []error{nil, errors.New("upstream timeout")}, + } + a := newAgenticTestAgent(client) + + answer, handled, err := a.runAgenticTurn(context.Background(), "default", 4, "zh", "do it", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !handled { + t.Fatal("mid-loop failure must be handled (tools already executed)") + } + if !strings.Contains(answer, "definitely_not_a_tool") { + t.Fatalf("answer should mention the executed tool, got %q", answer) + } +} + +func TestRunAgenticTurnRoundLimitTriggersWrapUp(t *testing.T) { + responses := make([]*mcp.LLMResponse, 0, agenticMaxToolRounds) + for i := 0; i < agenticMaxToolRounds; i++ { + responses = append(responses, &mcp.LLMResponse{ + ToolCalls: []mcp.ToolCall{toolCall("call_x", "definitely_not_a_tool", "{}")}, + }) + } + client := &scriptedAIClient{ + fullResponses: responses, + plainResponse: "summary of what happened", + } + a := newAgenticTestAgent(client) + + answer, handled, err := a.runAgenticTurn(context.Background(), "default", 5, "en", "loop forever", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !handled { + t.Fatal("expected handled=true at round limit") + } + if answer != "summary of what happened" { + t.Fatalf("answer = %q, want wrap-up summary", answer) + } + if client.fullCalls != agenticMaxToolRounds { + t.Fatalf("fullCalls = %d, want %d", client.fullCalls, agenticMaxToolRounds) + } + if client.plainCalls != 1 { + t.Fatalf("plainCalls = %d, want 1 wrap-up call", client.plainCalls) + } +} + +func TestRunAgenticTurnIncludesRecentHistory(t *testing.T) { + client := &scriptedAIClient{ + fullResponses: []*mcp.LLMResponse{{Content: "answer"}}, + } + a := newAgenticTestAgent(client) + a.history.Add(6, "user", "前一个问题") + a.history.Add(6, "assistant", "前一个回答") + + if _, handled, err := a.runAgenticTurn(context.Background(), "default", 6, "zh", "新问题", nil); err != nil || !handled { + t.Fatalf("handled=%v err=%v", handled, err) + } + + req := client.fullRequests[0] + var sawPrevUser, sawPrevAssistant bool + for _, m := range req.Messages { + if m.Role == "user" && m.Content == "前一个问题" { + sawPrevUser = true + } + if m.Role == "assistant" && m.Content == "前一个回答" { + sawPrevAssistant = true + } + } + if !sawPrevUser || !sawPrevAssistant { + t.Fatalf("recent history missing from request: user=%v assistant=%v", sawPrevUser, sawPrevAssistant) + } +} + +func TestAgentV2Enabled(t *testing.T) { + cases := []struct { + value string + want bool + }{ + {"", true}, + {"1", true}, + {"true", true}, + {"on", true}, + {"0", false}, + {"false", false}, + {"off", false}, + {"disabled", false}, + } + for _, tc := range cases { + t.Run("value="+tc.value, func(t *testing.T) { + t.Setenv("NOFX_AGENT_V2", tc.value) + if got := agentV2Enabled(); got != tc.want { + t.Errorf("agentV2Enabled() with %q = %v, want %v", tc.value, got, tc.want) + } + }) + } +}