mirror of
https://github.com/NoFxAiOS/nofx.git
synced 2026-07-05 03:50:59 +08:00
feat(agent): add native function-calling agentic loop as the new brain core
One standard tool-use loop replaces the need for layered JSON routing: the LLM sees all 22 tools plus real multi-turn history, every tool result (including errors) returns to the loop as an observation, and the final user-facing reply is always LLM-written. Interruptions report exactly which tools already executed so side effects are never silently lost or repeated by fallback paths. Gated by NOFX_AGENT_V2 (default on).
This commit is contained in:
193
agent/agentic_loop.go
Normal file
193
agent/agentic_loop.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"nofx/mcp"
|
||||
)
|
||||
|
||||
const (
|
||||
// agenticMaxToolRounds bounds the number of LLM round-trips in one user
|
||||
// turn. Each round may execute several tool calls, so this comfortably
|
||||
// covers chained operations (create → configure → start) while still
|
||||
// terminating runaway loops.
|
||||
agenticMaxToolRounds = 12
|
||||
|
||||
// agenticHistoryMessages is the number of recent history messages replayed
|
||||
// to the LLM as real conversation turns.
|
||||
agenticHistoryMessages = 12
|
||||
)
|
||||
|
||||
// agentV2Enabled reports whether the native function-calling loop is the
|
||||
// primary brain. Enabled by default; set NOFX_AGENT_V2=0/false/off/disabled
|
||||
// to fall back to the legacy routing stack.
|
||||
func agentV2Enabled() bool {
|
||||
switch strings.TrimSpace(strings.ToLower(os.Getenv("NOFX_AGENT_V2"))) {
|
||||
case "0", "false", "off", "disabled":
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// runAgenticTurn drives one user turn through a native function-calling loop:
|
||||
// the LLM sees the full toolset plus recent conversation, decides which tools
|
||||
// to call, receives every tool result (including errors) as observations, and
|
||||
// writes the final user-facing reply itself.
|
||||
//
|
||||
// Returns handled=false when nothing user-visible happened and the caller
|
||||
// should fall back to the legacy routing stack (e.g. the very first LLM call
|
||||
// failed). Once any tool has executed, the turn is always handled so side
|
||||
// effects are never silently repeated by a fallback path.
|
||||
func (a *Agent) runAgenticTurn(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) {
|
||||
if a.aiClient == nil {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
messages := []mcp.Message{mcp.NewSystemMessage(a.buildSystemPromptForStoreUser(lang, storeUserID))}
|
||||
if prefs := a.buildPersistentPreferencesContext(userID); prefs != "" {
|
||||
messages = append(messages, mcp.NewSystemMessage(prefs))
|
||||
}
|
||||
if taskCtx := buildTaskStateContext(a.getTaskState(userID)); taskCtx != "" {
|
||||
messages = append(messages, mcp.NewSystemMessage(taskCtx))
|
||||
}
|
||||
messages = append(messages, a.recentHistoryMessages(userID, text)...)
|
||||
messages = append(messages, mcp.NewUserMessage(text))
|
||||
|
||||
tools := agentTools()
|
||||
var executedTools []string
|
||||
|
||||
for round := 0; round < agenticMaxToolRounds; round++ {
|
||||
resp, err := a.aiClient.CallWithRequestFull(&mcp.Request{
|
||||
Messages: messages,
|
||||
Tools: tools,
|
||||
ToolChoice: "auto",
|
||||
Ctx: ctx,
|
||||
})
|
||||
if err != nil {
|
||||
a.logger.Warn("agentic turn LLM call failed", "error", err, "user_id", userID, "round", round)
|
||||
if len(executedTools) == 0 {
|
||||
// Nothing happened yet — safe to let the legacy stack retry.
|
||||
return "", false, nil
|
||||
}
|
||||
reply := agenticInterruptedReply(lang, executedTools)
|
||||
return a.finishAgenticTurn(userID, lang, text, reply, onEvent), true, nil
|
||||
}
|
||||
|
||||
if len(resp.ToolCalls) == 0 {
|
||||
reply := strings.TrimSpace(resp.Content)
|
||||
if reply == "" {
|
||||
if len(executedTools) == 0 {
|
||||
return "", false, nil
|
||||
}
|
||||
reply = agenticInterruptedReply(lang, executedTools)
|
||||
}
|
||||
return a.finishAgenticTurn(userID, lang, text, reply, onEvent), true, nil
|
||||
}
|
||||
|
||||
assistantMsg := mcp.Message{Role: "assistant", ToolCalls: resp.ToolCalls}
|
||||
if resp.Content != "" {
|
||||
assistantMsg.Content = resp.Content
|
||||
}
|
||||
if resp.ReasoningContent != "" {
|
||||
assistantMsg.ReasoningContent = resp.ReasoningContent
|
||||
}
|
||||
messages = append(messages, assistantMsg)
|
||||
|
||||
for _, tc := range resp.ToolCalls {
|
||||
if onEvent != nil {
|
||||
onEvent(StreamEventTool, tc.Function.Name)
|
||||
}
|
||||
executedTools = append(executedTools, tc.Function.Name)
|
||||
result := a.handleToolCall(ctx, storeUserID, userID, lang, tc)
|
||||
messages = append(messages, mcp.Message{
|
||||
Role: "tool",
|
||||
Content: result,
|
||||
ToolCallID: tc.ID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Round budget exhausted: ask the LLM to wrap up with what it has, without
|
||||
// offering further tools.
|
||||
messages = append(messages, mcp.NewSystemMessage(agenticWrapUpInstruction(lang)))
|
||||
final, err := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx})
|
||||
if err != nil || strings.TrimSpace(final) == "" {
|
||||
if err != nil {
|
||||
a.logger.Warn("agentic wrap-up call failed", "error", err, "user_id", userID)
|
||||
}
|
||||
final = agenticInterruptedReply(lang, executedTools)
|
||||
}
|
||||
return a.finishAgenticTurn(userID, lang, text, final, onEvent), true, nil
|
||||
}
|
||||
|
||||
// finishAgenticTurn applies final-reply guards, records the turn in history,
|
||||
// and streams the reply.
|
||||
func (a *Agent) finishAgenticTurn(userID int64, lang, text, reply string, onEvent func(event, data string)) string {
|
||||
if guarded, blocked := guardUnsupportedAsyncPromise(lang, reply); blocked {
|
||||
reply = guarded
|
||||
}
|
||||
if a.history != nil {
|
||||
a.history.Add(userID, "user", text)
|
||||
a.history.Add(userID, "assistant", reply)
|
||||
}
|
||||
emitStreamText(onEvent, reply)
|
||||
return reply
|
||||
}
|
||||
|
||||
// recentHistoryMessages replays recent conversation turns as real chat
|
||||
// messages so the LLM has multi-turn context, dropping a trailing duplicate of
|
||||
// the current user text if the caller already recorded it.
|
||||
func (a *Agent) recentHistoryMessages(userID int64, currentText string) []mcp.Message {
|
||||
if a.history == nil {
|
||||
return nil
|
||||
}
|
||||
msgs := a.history.Get(userID)
|
||||
if n := len(msgs); n > 0 && msgs[n-1].Role == "user" &&
|
||||
strings.TrimSpace(msgs[n-1].Content) == strings.TrimSpace(currentText) {
|
||||
msgs = msgs[:n-1]
|
||||
}
|
||||
if len(msgs) > agenticHistoryMessages {
|
||||
msgs = msgs[len(msgs)-agenticHistoryMessages:]
|
||||
}
|
||||
out := make([]mcp.Message, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
content := strings.TrimSpace(m.Content)
|
||||
if content == "" {
|
||||
continue
|
||||
}
|
||||
switch m.Role {
|
||||
case "user":
|
||||
out = append(out, mcp.NewUserMessage(content))
|
||||
case "assistant":
|
||||
out = append(out, mcp.Message{Role: "assistant", Content: content})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// agenticInterruptedReply tells the user exactly which tools already ran when
|
||||
// a turn cannot produce an LLM-written reply, so work is never silently lost.
|
||||
func agenticInterruptedReply(lang string, executedTools []string) string {
|
||||
tools := strings.Join(executedTools, ", ")
|
||||
if lang == "zh" {
|
||||
if tools == "" {
|
||||
return "刚才处理你的请求时 AI 服务中断了,已执行的操作没有丢失。请再说一次你想做什么,我接着处理。"
|
||||
}
|
||||
return fmt.Sprintf("处理过程中 AI 服务中断了。已执行的操作:%s。这些结果已生效,你可以让我继续下一步或查询当前状态。", tools)
|
||||
}
|
||||
if tools == "" {
|
||||
return "The AI service was interrupted while handling your request. Nothing was lost — please tell me again what you'd like to do."
|
||||
}
|
||||
return fmt.Sprintf("The AI service was interrupted mid-task. Tools already executed: %s. Those results took effect — ask me to continue or check the current state.", tools)
|
||||
}
|
||||
|
||||
// agenticWrapUpInstruction is appended when the tool-round budget is spent.
|
||||
func agenticWrapUpInstruction(lang string) string {
|
||||
if lang == "zh" {
|
||||
return "工具调用轮次已达上限。请基于以上已获得的全部结果,直接给用户一个完整的中文总结回复:说明已完成什么、未完成什么、建议的下一步。不要再请求调用工具。"
|
||||
}
|
||||
return "Tool-call round limit reached. Using everything gathered above, write the final reply for the user now: what was completed, what was not, and the suggested next step. Do not request more tool calls."
|
||||
}
|
||||
275
agent/agentic_loop_test.go
Normal file
275
agent/agentic_loop_test.go
Normal file
@@ -0,0 +1,275 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"nofx/mcp"
|
||||
)
|
||||
|
||||
// scriptedAIClient returns queued LLMResponses (or errors) in order for
|
||||
// CallWithRequestFull, and queued plain strings for CallWithRequest.
|
||||
type scriptedAIClient struct {
|
||||
fullResponses []*mcp.LLMResponse
|
||||
fullErrs []error
|
||||
fullCalls int
|
||||
fullRequests []*mcp.Request
|
||||
|
||||
plainResponse string
|
||||
plainErr error
|
||||
plainCalls int
|
||||
}
|
||||
|
||||
func (c *scriptedAIClient) SetAPIKey(apiKey string, customURL string, customModel string) {}
|
||||
func (c *scriptedAIClient) SetTimeout(timeout time.Duration) {}
|
||||
func (c *scriptedAIClient) CallWithMessages(systemPrompt, userPrompt string) (string, error) {
|
||||
return c.plainResponse, c.plainErr
|
||||
}
|
||||
func (c *scriptedAIClient) CallWithRequest(req *mcp.Request) (string, error) {
|
||||
c.plainCalls++
|
||||
return c.plainResponse, c.plainErr
|
||||
}
|
||||
func (c *scriptedAIClient) CallWithRequestStream(req *mcp.Request, onChunk func(string)) (string, error) {
|
||||
if onChunk != nil && c.plainErr == nil {
|
||||
onChunk(c.plainResponse)
|
||||
}
|
||||
return c.plainResponse, c.plainErr
|
||||
}
|
||||
func (c *scriptedAIClient) CallWithRequestFull(req *mcp.Request) (*mcp.LLMResponse, error) {
|
||||
idx := c.fullCalls
|
||||
c.fullCalls++
|
||||
c.fullRequests = append(c.fullRequests, req)
|
||||
var err error
|
||||
if idx < len(c.fullErrs) {
|
||||
err = c.fullErrs[idx]
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if idx < len(c.fullResponses) {
|
||||
return c.fullResponses[idx], nil
|
||||
}
|
||||
return &mcp.LLMResponse{Content: "fallthrough"}, nil
|
||||
}
|
||||
|
||||
func newAgenticTestAgent(client mcp.AIClient) *Agent {
|
||||
a := New(nil, nil, DefaultConfig(), slog.Default())
|
||||
a.SetAIClient(client)
|
||||
return a
|
||||
}
|
||||
|
||||
func toolCall(id, name, args string) mcp.ToolCall {
|
||||
return mcp.ToolCall{
|
||||
ID: id,
|
||||
Type: "function",
|
||||
Function: mcp.ToolCallFunction{
|
||||
Name: name,
|
||||
Arguments: args,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnDirectAnswer(t *testing.T) {
|
||||
client := &scriptedAIClient{
|
||||
fullResponses: []*mcp.LLMResponse{{Content: "你好,我是 NOFX 助手。"}},
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
|
||||
answer, handled, err := a.runAgenticTurn(context.Background(), "default", 1, "zh", "你好", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !handled {
|
||||
t.Fatal("expected turn to be handled")
|
||||
}
|
||||
if answer != "你好,我是 NOFX 助手。" {
|
||||
t.Fatalf("answer = %q", answer)
|
||||
}
|
||||
if client.fullCalls != 1 {
|
||||
t.Fatalf("fullCalls = %d, want 1", client.fullCalls)
|
||||
}
|
||||
// Tools must be offered on the request.
|
||||
if len(client.fullRequests[0].Tools) == 0 {
|
||||
t.Fatal("expected tools to be attached to the LLM request")
|
||||
}
|
||||
// Conversation must be recorded in history.
|
||||
msgs := a.history.Get(1)
|
||||
if len(msgs) != 2 || msgs[0].Role != "user" || msgs[1].Role != "assistant" {
|
||||
t.Fatalf("history = %+v, want user+assistant turns", msgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnToolRoundTrip(t *testing.T) {
|
||||
client := &scriptedAIClient{
|
||||
fullResponses: []*mcp.LLMResponse{
|
||||
{ToolCalls: []mcp.ToolCall{toolCall("call_1", "definitely_not_a_tool", "{}")}},
|
||||
{Content: "done"},
|
||||
},
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
|
||||
var toolEvents []string
|
||||
onEvent := func(event, data string) {
|
||||
if event == StreamEventTool {
|
||||
toolEvents = append(toolEvents, data)
|
||||
}
|
||||
}
|
||||
|
||||
answer, handled, err := a.runAgenticTurn(context.Background(), "default", 2, "en", "do something", onEvent)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !handled || answer != "done" {
|
||||
t.Fatalf("handled=%v answer=%q", handled, answer)
|
||||
}
|
||||
if client.fullCalls != 2 {
|
||||
t.Fatalf("fullCalls = %d, want 2", client.fullCalls)
|
||||
}
|
||||
if len(toolEvents) != 1 || toolEvents[0] != "definitely_not_a_tool" {
|
||||
t.Fatalf("toolEvents = %v", toolEvents)
|
||||
}
|
||||
|
||||
// The second request must carry the assistant tool-call message and the
|
||||
// tool result message with matching ToolCallID.
|
||||
second := client.fullRequests[1]
|
||||
var sawAssistantToolCall, sawToolResult bool
|
||||
for _, m := range second.Messages {
|
||||
if m.Role == "assistant" && len(m.ToolCalls) == 1 && m.ToolCalls[0].ID == "call_1" {
|
||||
sawAssistantToolCall = true
|
||||
}
|
||||
if m.Role == "tool" && m.ToolCallID == "call_1" {
|
||||
sawToolResult = true
|
||||
if !strings.Contains(m.Content, "unknown tool") {
|
||||
t.Fatalf("tool result = %q, want unknown-tool error payload", m.Content)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sawAssistantToolCall || !sawToolResult {
|
||||
t.Fatalf("tool round-trip messages missing: assistant=%v tool=%v", sawAssistantToolCall, sawToolResult)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnFirstCallFailureFallsBack(t *testing.T) {
|
||||
client := &scriptedAIClient{
|
||||
fullErrs: []error{errors.New("upstream 500")},
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
|
||||
_, handled, err := a.runAgenticTurn(context.Background(), "default", 3, "zh", "hi", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if handled {
|
||||
t.Fatal("expected fallback (handled=false) when the first LLM call fails")
|
||||
}
|
||||
if got := a.history.Get(3); len(got) != 0 {
|
||||
t.Fatalf("history should stay empty on fallback, got %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnMidLoopFailureReportsExecutedTools(t *testing.T) {
|
||||
client := &scriptedAIClient{
|
||||
fullResponses: []*mcp.LLMResponse{
|
||||
{ToolCalls: []mcp.ToolCall{toolCall("call_1", "definitely_not_a_tool", "{}")}},
|
||||
},
|
||||
fullErrs: []error{nil, errors.New("upstream timeout")},
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
|
||||
answer, handled, err := a.runAgenticTurn(context.Background(), "default", 4, "zh", "do it", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !handled {
|
||||
t.Fatal("mid-loop failure must be handled (tools already executed)")
|
||||
}
|
||||
if !strings.Contains(answer, "definitely_not_a_tool") {
|
||||
t.Fatalf("answer should mention the executed tool, got %q", answer)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnRoundLimitTriggersWrapUp(t *testing.T) {
|
||||
responses := make([]*mcp.LLMResponse, 0, agenticMaxToolRounds)
|
||||
for i := 0; i < agenticMaxToolRounds; i++ {
|
||||
responses = append(responses, &mcp.LLMResponse{
|
||||
ToolCalls: []mcp.ToolCall{toolCall("call_x", "definitely_not_a_tool", "{}")},
|
||||
})
|
||||
}
|
||||
client := &scriptedAIClient{
|
||||
fullResponses: responses,
|
||||
plainResponse: "summary of what happened",
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
|
||||
answer, handled, err := a.runAgenticTurn(context.Background(), "default", 5, "en", "loop forever", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !handled {
|
||||
t.Fatal("expected handled=true at round limit")
|
||||
}
|
||||
if answer != "summary of what happened" {
|
||||
t.Fatalf("answer = %q, want wrap-up summary", answer)
|
||||
}
|
||||
if client.fullCalls != agenticMaxToolRounds {
|
||||
t.Fatalf("fullCalls = %d, want %d", client.fullCalls, agenticMaxToolRounds)
|
||||
}
|
||||
if client.plainCalls != 1 {
|
||||
t.Fatalf("plainCalls = %d, want 1 wrap-up call", client.plainCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunAgenticTurnIncludesRecentHistory(t *testing.T) {
|
||||
client := &scriptedAIClient{
|
||||
fullResponses: []*mcp.LLMResponse{{Content: "answer"}},
|
||||
}
|
||||
a := newAgenticTestAgent(client)
|
||||
a.history.Add(6, "user", "前一个问题")
|
||||
a.history.Add(6, "assistant", "前一个回答")
|
||||
|
||||
if _, handled, err := a.runAgenticTurn(context.Background(), "default", 6, "zh", "新问题", nil); err != nil || !handled {
|
||||
t.Fatalf("handled=%v err=%v", handled, err)
|
||||
}
|
||||
|
||||
req := client.fullRequests[0]
|
||||
var sawPrevUser, sawPrevAssistant bool
|
||||
for _, m := range req.Messages {
|
||||
if m.Role == "user" && m.Content == "前一个问题" {
|
||||
sawPrevUser = true
|
||||
}
|
||||
if m.Role == "assistant" && m.Content == "前一个回答" {
|
||||
sawPrevAssistant = true
|
||||
}
|
||||
}
|
||||
if !sawPrevUser || !sawPrevAssistant {
|
||||
t.Fatalf("recent history missing from request: user=%v assistant=%v", sawPrevUser, sawPrevAssistant)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentV2Enabled(t *testing.T) {
|
||||
cases := []struct {
|
||||
value string
|
||||
want bool
|
||||
}{
|
||||
{"", true},
|
||||
{"1", true},
|
||||
{"true", true},
|
||||
{"on", true},
|
||||
{"0", false},
|
||||
{"false", false},
|
||||
{"off", false},
|
||||
{"disabled", false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run("value="+tc.value, func(t *testing.T) {
|
||||
t.Setenv("NOFX_AGENT_V2", tc.value)
|
||||
if got := agentV2Enabled(); got != tc.want {
|
||||
t.Errorf("agentV2Enabled() with %q = %v, want %v", tc.value, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user