From 7302f96e8e33fbcff83691ebd66e178d857fe420 Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sun, 2 Nov 2025 14:03:13 +0800 Subject: [PATCH] =?UTF-8?q?K=E7=BA=BF=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E6=94=B9=E4=B8=BAwebsocket=E7=BB=84=E5=90=88=E6=B5=81.=20?= =?UTF-8?q?=E5=B8=A6=E9=87=8D=E6=8B=A8=E6=9C=BA=E5=88=B6=20=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E4=B8=BA=E4=B8=8B=EF=BC=9A=201.=20=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=97=B6=E4=BD=BF=E7=94=A8=E6=89=80=E6=9C=89=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E8=AE=BE=E7=BD=AE=E7=9A=84=E5=B8=81=E7=A7=8D(?= =?UTF-8?q?=E5=8E=BB=E9=87=8D)=20=E5=A6=82=E6=9E=9C=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E6=9C=AA=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=88=99=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=B3=BB=E7=BB=9F=E9=BB=98=E8=AE=A4=202.=20=E5=9C=A8?= =?UTF-8?q?=E5=86=B3=E7=AD=96=E8=8E=B7=E5=8F=96K=E7=BA=BF=E6=97=B6=20?= =?UTF-8?q?=E5=A6=82=E6=9E=9C=E6=B2=A1=E6=9C=89=E7=BC=93=E5=AD=98=20?= =?UTF-8?q?=E5=88=99=E5=85=88=E5=AE=9E=E6=97=B6=E8=8E=B7=E5=8F=96=E5=90=8E?= =?UTF-8?q?=E5=86=8D=E6=B7=BB=E5=8A=A0=E8=AE=A2=E9=98=85.=20ps:=20?= =?UTF-8?q?=E9=80=82=E7=94=A8=E4=BA=8EApi=E6=96=B9=E5=BC=8F=E7=9A=84?= =?UTF-8?q?=E5=B8=81=E7=A7=8D=E5=88=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 2 +- config/database.go | 215 +++++++++++++++++++++++++-------------------- main.go | 29 +----- market/monitor.go | 112 +++++++++++------------ 4 files changed, 179 insertions(+), 179 deletions(-) diff --git a/config/config.go b/config/config.go index 3b736d0e..430e428c 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,7 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/config/database.go b/config/database.go index c5eef755..1102c6fb 100644 --- a/config/database.go +++ b/config/database.go @@ -4,8 +4,11 @@ import ( "crypto/rand" "database/sql" "encoding/base32" + "encoding/json" "fmt" "log" + "nofx/market" + "slices" "strings" "time" @@ -177,17 +180,18 @@ func (d *Database) createTables() error { `ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`, - `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 - `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 - `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) - `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 - `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 - `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 - `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 - `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 + `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 + `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) + `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 + `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 + `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 + `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 + `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN use_inside_coins BOOLEAN DEFAULT 0`, // 是否使用内置AI评分信号源 `ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称 - `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 - `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 + `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 + `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 } for _, query := range alterQueries { @@ -245,16 +249,16 @@ func (d *Database) initDefaultData() error { // 初始化系统配置 - 创建所有字段,设置默认值,后续由config.json同步更新 systemConfigs := map[string]string{ - "admin_mode": "true", // 默认开启管理员模式,便于首次使用 - "api_server_port": "8080", // 默认API端口 - "use_default_coins": "true", // 默认使用内置币种列表 - "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) - "max_daily_loss": "10.0", // 最大日损失百分比 - "max_drawdown": "20.0", // 最大回撤百分比 - "stop_trading_minutes": "60", // 停止交易时间(分钟) - "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 - "altcoin_leverage": "5", // 山寨币杠杆倍数 - "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 + "admin_mode": "true", // 默认开启管理员模式,便于首次使用 + "api_server_port": "8080", // 默认API端口 + "use_default_coins": "true", // 默认使用内置币种列表 + "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) + "max_daily_loss": "10.0", // 最大日损失百分比 + "max_drawdown": "20.0", // 最大回撤百分比 + "stop_trading_minutes": "60", // 停止交易时间(分钟) + "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 + "altcoin_leverage": "5", // 山寨币杠杆倍数 + "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 } for key, value := range systemConfigs { @@ -281,14 +285,14 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return err } - + // 如果已经迁移过,直接返回 if count > 0 { return nil } - + log.Printf("🔄 开始迁移exchanges表...") - + // 创建新的exchanges表,使用复合主键 _, err = d.db.Exec(` CREATE TABLE exchanges_new ( @@ -313,7 +317,7 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建新exchanges表失败: %w", err) } - + // 复制数据到新表 _, err = d.db.Exec(` INSERT INTO exchanges_new @@ -322,19 +326,19 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("复制数据失败: %w", err) } - + // 删除旧表 _, err = d.db.Exec(`DROP TABLE exchanges`) if err != nil { return fmt.Errorf("删除旧表失败: %w", err) } - + // 重命名新表 _, err = d.db.Exec(`ALTER TABLE exchanges_new RENAME TO exchanges`) if err != nil { return fmt.Errorf("重命名表失败: %w", err) } - + // 重新创建触发器 _, err = d.db.Exec(` CREATE TRIGGER IF NOT EXISTS update_exchanges_updated_at @@ -347,20 +351,20 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建触发器失败: %w", err) } - + log.Printf("✅ exchanges表迁移完成") return nil } // User 用户配置 type User struct { - ID string `json:"id"` - Email string `json:"email"` - PasswordHash string `json:"-"` // 不返回到前端 - OTPSecret string `json:"-"` // 不返回到前端 - OTPVerified bool `json:"otp_verified"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Email string `json:"email"` + PasswordHash string `json:"-"` // 不返回到前端 + OTPSecret string `json:"-"` // 不返回到前端 + OTPVerified bool `json:"otp_verified"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AIModelConfig AI模型配置 @@ -379,39 +383,40 @@ type AIModelConfig struct { // ExchangeConfig 交易所配置 type ExchangeConfig struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - Type string `json:"type"` - Enabled bool `json:"enabled"` - APIKey string `json:"apiKey"` - SecretKey string `json:"secretKey"` - Testnet bool `json:"testnet"` + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + Type string `json:"type"` + Enabled bool `json:"enabled"` + APIKey string `json:"apiKey"` + SecretKey string `json:"secretKey"` + Testnet bool `json:"testnet"` // Hyperliquid 特定字段 HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"` // Aster 特定字段 - AsterUser string `json:"asterUser"` - AsterSigner string `json:"asterSigner"` - AsterPrivateKey string `json:"asterPrivateKey"` + AsterUser string `json:"asterUser"` + AsterSigner string `json:"asterSigner"` + AsterPrivateKey string `json:"asterPrivateKey"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // TraderRecord 交易员配置(数据库实体) type TraderRecord struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - AIModelID string `json:"ai_model_id"` - ExchangeID string `json:"exchange_id"` - InitialBalance float64 `json:"initial_balance"` - ScanIntervalMinutes int `json:"scan_interval_minutes"` - IsRunning bool `json:"is_running"` - BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 - AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + AIModelID string `json:"ai_model_id"` + ExchangeID string `json:"exchange_id"` + InitialBalance float64 `json:"initial_balance"` + ScanIntervalMinutes int `json:"scan_interval_minutes"` + IsRunning bool `json:"is_running"` + BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 + AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔 UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源 UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置评分信号源 CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称 @@ -422,12 +427,12 @@ type TraderRecord struct { // UserSignalSource 用户信号源配置 type UserSignalSource struct { - ID int `json:"id"` - UserID string `json:"user_id"` - CoinPoolURL string `json:"coin_pool_url"` - OITopURL string `json:"oi_top_url"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID int `json:"id"` + UserID string `json:"user_id"` + CoinPoolURL string `json:"coin_pool_url"` + OITopURL string `json:"oi_top_url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // GenerateOTPSecret 生成OTP密钥 @@ -457,12 +462,12 @@ func (d *Database) EnsureAdminUser() error { if err != nil { return err } - + // 如果已存在,直接返回 if count > 0 { return nil } - + // 创建admin用户(密码为空,因为管理员模式下不需要密码) adminUser := &User{ ID: "admin", @@ -471,7 +476,7 @@ func (d *Database) EnsureAdminUser() error { OTPSecret: "", OTPVerified: true, } - + return d.CreateUser(adminUser) } @@ -482,7 +487,7 @@ func (d *Database) GetUserByEmail(email string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE email = ? `, email).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -498,7 +503,7 @@ func (d *Database) GetUserByID(userID string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE id = ? `, userID).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -668,7 +673,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { err := rows.Scan( &exchange.ID, &exchange.UserID, &exchange.Name, &exchange.Type, &exchange.Enabled, &exchange.APIKey, &exchange.SecretKey, &exchange.Testnet, - &exchange.HyperliquidWalletAddr, &exchange.AsterUser, + &exchange.HyperliquidWalletAddr, &exchange.AsterUser, &exchange.AsterSigner, &exchange.AsterPrivateKey, &exchange.CreatedAt, &exchange.UpdatedAt, ) @@ -684,7 +689,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { // UpdateExchange 更新交易所配置,如果不存在则创建用户特定配置 func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secretKey string, testnet bool, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey string) error { log.Printf("🔧 UpdateExchange: userID=%s, id=%s, enabled=%v", userID, id, enabled) - + // 首先尝试更新现有的用户配置 result, err := d.db.Exec(` UPDATE exchanges SET enabled = ?, api_key = ?, secret_key = ?, testnet = ?, @@ -695,20 +700,20 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre log.Printf("❌ UpdateExchange: 更新失败: %v", err) return err } - + // 检查是否有行被更新 rowsAffected, err := result.RowsAffected() if err != nil { log.Printf("❌ UpdateExchange: 获取影响行数失败: %v", err) return err } - + log.Printf("📊 UpdateExchange: 影响行数 = %d", rowsAffected) - + // 如果没有行被更新,说明用户没有这个交易所的配置,需要创建 if rowsAffected == 0 { log.Printf("💡 UpdateExchange: 没有现有记录,创建新记录") - + // 根据交易所ID确定基本信息 var name, typ string if id == "binance" { @@ -724,16 +729,16 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre name = id + " Exchange" typ = "cex" } - + log.Printf("🆕 UpdateExchange: 创建新记录 ID=%s, name=%s, type=%s", id, name, typ) - + // 创建用户特定的配置,使用原始的交易所ID _, err = d.db.Exec(` INSERT INTO exchanges (id, user_id, name, type, enabled, api_key, secret_key, testnet, hyperliquid_wallet_addr, aster_user, aster_signer, aster_private_key, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) `, id, userID, name, typ, enabled, apiKey, secretKey, testnet, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey) - + if err != nil { log.Printf("❌ UpdateExchange: 创建记录失败: %v", err) } else { @@ -741,7 +746,7 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre } return err } - + log.Printf("✅ UpdateExchange: 更新现有记录成功") return nil } @@ -767,9 +772,9 @@ func (d *Database) CreateExchange(userID, id, name, typ string, enabled bool, ap // CreateTrader 创建交易员 func (d *Database) CreateTrader(trader *TraderRecord) error { _, err := d.db.Exec(` - INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) + INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, use_inside_coins, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?) + `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.UseInsideCoins, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) return err } @@ -779,7 +784,7 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage, COALESCE(trading_symbols, '') as trading_symbols, - COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top, + COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,COALESCE(use_inside_coins, 0) as use_inside_coins, COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt, COALESCE(system_prompt_template, 'default') as system_prompt_template, COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at @@ -790,14 +795,14 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { } defer rows.Close() - var traders []*TraderRecord + var traders []*TraderRecord for rows.Next() { - var trader TraderRecord + var trader TraderRecord err := rows.Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, - &trader.UseCoinPool, &trader.UseOITop, + &trader.UseCoinPool, &trader.UseOITop, &trader.UseInsideCoins, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, @@ -847,18 +852,13 @@ func (d *Database) DeleteTrader(userID, id string) error { // GetTraderConfig 获取交易员完整配置(包含AI模型和交易所信息) func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) { - var trader TraderRecord + var trader TraderRecord var aiModel AIModelConfig var exchange ExchangeConfig err := d.db.QueryRow(` SELECT - t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, - COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage, - COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool, - COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt, - COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin, - t.created_at, t.updated_at, + t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at, a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at, e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet, COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr, @@ -873,8 +873,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM `, traderID, userID).Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, - &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool, - &trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, &aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey, &aiModel.CreatedAt, &aiModel.UpdatedAt, @@ -940,7 +938,36 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string) return err } +// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies +func (d *Database) GetCustomCoins() []string { + var symbol string + var symbols []string + _ = d.db.QueryRow(` + SELECT GROUP_CONCAT(custom_coins , ',') as symbol + FROM main.traders where custom_coins != '' + `).Scan(&symbol) + // 检测用户是否未配置币种 - 兼容性 + if symbol == "" { + symbolJSON, _ := d.GetSystemConfig("default_coins") + if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil { + log.Printf("⚠️ 解析default_coins配置失败: %v,使用硬编码默认值", err) + symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"} + } + } + // filter Symbol + for _, s := range strings.Split(symbol, ",") { + if s == "" { + continue + } + coin := market.Normalize(s) + if !slices.Contains(symbols, coin) { + symbols = append(symbols, coin) + } + } + return symbols +} + // Close 关闭数据库连接 func (d *Database) Close() error { return d.db.Close() -} \ No newline at end of file +} diff --git a/main.go b/main.go index 7929072b..1ffc562e 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "syscall" - "time" ) // LeverageConfig 杠杆配置 @@ -30,9 +29,9 @@ type ConfigFile struct { APIServerPort int `json:"api_server_port"` UseDefaultCoins bool `json:"use_default_coins"` DefaultCoins []string `json:"default_coins"` - InsideCoins bool `json:"inside_coins"` CoinPoolAPIURL string `json:"coin_pool_api_url"` OITopAPIURL string `json:"oi_top_api_url"` + InsideCoins bool `json:"inside_coins"` MaxDailyLoss float64 `json:"max_daily_loss"` MaxDrawdown float64 `json:"max_drawdown"` StopTradingMinutes int `json:"stop_trading_minutes"` @@ -68,9 +67,9 @@ func syncConfigToDatabase(database *config.Database) error { "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), "api_server_port": strconv.Itoa(configFile.APIServerPort), "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), - "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "coin_pool_api_url": configFile.CoinPoolAPIURL, "oi_top_api_url": configFile.OITopAPIURL, + "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), @@ -137,8 +136,6 @@ func main() { // 获取系统配置 useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" - InsideCoinsStr, _ := database.GetSystemConfig("inside_coins") - insideCoins := InsideCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") // 获取管理员模式配置 @@ -186,26 +183,6 @@ func main() { } pool.SetDefaultCoins(defaultCoins) - - //内置AI评分 - if insideCoins { - log.Printf("✓ 启用内置AI评分币种列表") - monitor := market.NewWSMonitor(150) - go func() { - monitor.Start() - // 定时器设置默认的币种列表 - 覆蓋defaultCoins设置 - for { - if len(monitor.FilterSymbol) > 0 { - for _, coin := range defaultCoins { - monitor.FilterSymbol = append(monitor.FilterSymbol, coin) - } - pool.SetDefaultCoins(monitor.FilterSymbol) - monitor.FilterSymbol = nil - } - time.Sleep(1 * time.Minute) - } - }() - } // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -286,6 +263,8 @@ func main() { } }() + // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 + go market.NewWSMonitor(150).Start(database.GetCustomCoins()) // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/monitor.go b/market/monitor.go index 9837623e..b751dfe8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -35,6 +35,7 @@ type SymbolStats struct { } var WSMonitorCli *WSMonitor +var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期 func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ @@ -47,23 +48,27 @@ func NewWSMonitor(batchSize int) *WSMonitor { return WSMonitorCli } -func (m *WSMonitor) Initialize() error { +func (m *WSMonitor) Initialize(coins []string) error { log.Println("初始化WebSocket监控器...") - // 获取交易对信息 apiClient := NewAPIClient() - exchangeInfo, err := apiClient.GetExchangeInfo() - if err != nil { - return err + // 如果不指定交易对,则使用market市场的所有交易对币种 + if len(coins) == 0 { + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { + m.symbols = append(m.symbols, symbol.Symbol) + } + } + } else { + m.symbols = coins } - // 筛选永续合约交易对 --仅测试时使用 - //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] - for _, symbol := range exchangeInfo.Symbols { - if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" { - m.symbols = append(m.symbols, Normalize(symbol.Symbol)) - } - } log.Printf("找到 %d 个交易对", len(m.symbols)) // 初始化历史数据 if err := m.initializeHistoricalData(); err != nil { @@ -114,10 +119,10 @@ func (m *WSMonitor) initializeHistoricalData() error { return nil } -func (m *WSMonitor) Start() { +func (m *WSMonitor) Start(coins []string) { log.Printf("启动WebSocket实时监控...") // 初始化交易对 - err := m.Initialize() + err := m.Initialize(coins) if err != nil { log.Fatalf("❌ 初始化币种: %v", err) return @@ -129,42 +134,43 @@ func (m *WSMonitor) Start() { return } // 启动警报处理器 - go m.handleAlerts() + //go m.handleAlerts() // 启动定期清理任务 - go m.cleanupInactiveSymbols() + //go m.cleanupInactiveSymbols() // 输出监控统计 - 评分前十名 - go m.printFilterStats(50) + //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() - if err != nil { log.Fatalf("❌ 订阅币种交易对: %v", err) return } } +// subscribeSymbol 注册监听 +func (m *WSMonitor) subscribeSymbol(symbol, st string) []string { + var streams []string + stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st) + ch := m.combinedClient.AddSubscriber(stream, 100) + streams = append(streams, stream) + go m.handleKlineData(symbol, ch, st) + + return streams +} func (m *WSMonitor) subscribeAll() error { // 执行批量订阅 log.Println("开始订阅所有交易对...") for _, symbol := range m.symbols { - stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) - ch3m := m.combinedClient.AddSubscriber(stream3m, 100) - go m.handleKlineData(symbol, ch3m, "3m") - - stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol)) - ch4h := m.combinedClient.AddSubscriber(stream4h, 100) - go m.handleKlineData(symbol, ch4h, "4h") + for _, st := range subKlineTime { + m.subscribeSymbol(symbol, st) + } } - - err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") - if err != nil { - log.Fatalf("❌ 订阅3m K线: %v", err) - return err - } - err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h") - if err != nil { - log.Fatalf("❌ 订阅4h K线: %v", err) - return err + for _, st := range subKlineTime { + err := m.combinedClient.BatchSubscribeKlines(m.symbols, st) + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } } log.Println("所有交易对订阅完成") return nil @@ -181,34 +187,14 @@ func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time strin } } -func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) { - for data := range ch { - var tickerData TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - - m.processTickerUpdate(symbol, tickerData) - } -} -func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) { - for data := range ch { - var tickerData []TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - log.Fatalln(tickerData) - //m.processTickerUpdate(symbol, tickerData) - } -} func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { var klineDataMap *sync.Map if _time == "3m" { klineDataMap = &m.klineDataMap3m - } else { + } else if _time == "4h" { klineDataMap = &m.klineDataMap4h + } else { + klineDataMap = &sync.Map{} } return klineDataMap } @@ -310,11 +296,19 @@ func (m *WSMonitor) handleAlerts() { } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + // 对每一个进来的symbol检测是否存在内类 是否的话就订阅它 value, exists := m.getKlineDataMap(_time).Load(symbol) if !exists { // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) apiClient := NewAPIClient() - klines, err := apiClient.GetKlines(symbol, _time, 40) + klines, err := apiClient.GetKlines(symbol, _time, 100) + m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存 + subStr := m.subscribeSymbol(symbol, _time) + subErr := m.combinedClient.subscribeStreams(subStr) + log.Printf("动态订阅流: %v", subStr) + if subErr != nil { + return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr) + } if err != nil { return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) }