Files
nofx/telegram/agent/manager.go
tinkle-community 3168a18c0d feat(telegram): add AI agent bot with streaming and account context
- Add Telegram bot with long-polling and AI agent loop (api_call tool)
- SSE streaming with real-time message editing and  placeholder
- Account state injection at conversation start (models, exchanges,
  strategies, traders, per-trader PnL and statistics)
- Lane semaphore per chat serializes concurrent messages (60s timeout)
- Idle timeout watchdog (60s) prevents hung streaming connections
- Look-ahead buffer prevents partial <api_call> tag leaking to user
- Fix PUT /strategies/:id to merge config (read-then-merge pattern)
- Add route registry with full API schema for LLM documentation
- Add TelegramConfig store and Web UI config modal
- Add GetAnyEnabled to AIModel store for bot LLM client selection
2026-03-08 00:19:38 +08:00

79 lines
2.3 KiB
Go

package agent
import (
"nofx/logger"
"nofx/mcp"
"sync"
"time"
)
// Manager holds one Agent per Telegram chat ID.
// Messages for the same chat are serialized (OpenClaw Lane Queue pattern).
type Manager struct {
mu sync.Mutex
agents map[int64]*Agent
lanes map[int64]chan struct{}
apiPort int
botToken string
userID string
getLLM func() mcp.AIClient
systemPrompt string
}
// NewManager creates a Manager. Call api.GetAPIDocs() before this and pass the result as apiDocs.
// userID is the database user ID the bot authenticates as (used in system prompt context).
func NewManager(apiPort int, botToken, userID string, getLLM func() mcp.AIClient, apiDocs string) *Manager {
return &Manager{
agents: make(map[int64]*Agent),
lanes: make(map[int64]chan struct{}),
apiPort: apiPort,
botToken: botToken,
userID: userID,
getLLM: getLLM,
systemPrompt: BuildAgentPrompt(apiDocs, userID),
}
}
// Run processes a message for the given chat ID.
// If the same chat is already processing a message, this call blocks until it completes
// or the lane wait times out (60 s), whichever comes first.
// onChunk is optional — when set, LLM reply chunks are forwarded progressively (SSE streaming).
func (m *Manager) Run(chatID int64, userMessage string, onChunk func(string)) string {
a, lane := m.getOrCreate(chatID)
select {
case lane <- struct{}{}:
case <-time.After(60 * time.Second):
logger.Warnf("Agent: lane wait timeout for chat %d — previous message still processing", chatID)
return "上一条消息仍在处理中,请稍等片刻后再试。"
}
defer func() { <-lane }()
return a.Run(userMessage, onChunk)
}
// Reset clears memory for the given chat (called on /start).
func (m *Manager) Reset(chatID int64) {
m.mu.Lock()
a, ok := m.agents[chatID]
m.mu.Unlock()
if ok {
a.ResetMemory()
}
}
func (m *Manager) getOrCreate(chatID int64) (*Agent, chan struct{}) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[chatID]
if !ok {
a = New(m.apiPort, m.botToken, m.userID, m.getLLM, m.systemPrompt)
m.agents[chatID] = a
}
lane, ok := m.lanes[chatID]
if !ok {
lane = make(chan struct{}, 1) // binary semaphore: one message at a time per chat
m.lanes[chatID] = lane
}
return a, lane
}