K线获取方式改为websocket组合流. 带重拨机制

流程为下:
1. 启动时使用所有交易员设置的币种(去重) 如果交易员未配置,则使用系统默认
2. 在决策获取K线时 如果没有缓存 则先实时获取后再添加订阅. ps: 适用于Api方式的币种列表
This commit is contained in:
yuanshi2016
2025-11-02 14:03:13 +08:00
parent 1862223528
commit 3b1db6f64f
4 changed files with 179 additions and 179 deletions

View File

@@ -54,7 +54,7 @@ type LeverageConfig struct {
type Config struct { type Config struct {
Traders []TraderConfig `json:"traders"` Traders []TraderConfig `json:"traders"`
UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表
InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表
DefaultCoins []string `json:"default_coins"` // 默认主流币种池 DefaultCoins []string `json:"default_coins"` // 默认主流币种池
APIServerPort int `json:"api_server_port"` APIServerPort int `json:"api_server_port"`
MaxDailyLoss float64 `json:"max_daily_loss"` MaxDailyLoss float64 `json:"max_daily_loss"`

View File

@@ -4,8 +4,11 @@ import (
"crypto/rand" "crypto/rand"
"database/sql" "database/sql"
"encoding/base32" "encoding/base32"
"encoding/json"
"fmt" "fmt"
"log" "log"
"nofx/market"
"slices"
"strings" "strings"
"time" "time"
@@ -177,17 +180,18 @@ func (d *Database) createTables() error {
`ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`, `ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`,
`ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`,
`ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`, `ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`,
`ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式
`ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种
`ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表JSON格式 `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表JSON格式
`ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数
`ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数
`ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔
`ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源
`ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源
`ALTER TABLE traders ADD COLUMN use_inside_coins BOOLEAN DEFAULT 0`, // 是否使用内置AI评分信号源
`ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称 `ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称
`ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址
`ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称
} }
for _, query := range alterQueries { for _, query := range alterQueries {
@@ -245,16 +249,16 @@ func (d *Database) initDefaultData() error {
// 初始化系统配置 - 创建所有字段设置默认值后续由config.json同步更新 // 初始化系统配置 - 创建所有字段设置默认值后续由config.json同步更新
systemConfigs := map[string]string{ systemConfigs := map[string]string{
"admin_mode": "true", // 默认开启管理员模式,便于首次使用 "admin_mode": "true", // 默认开启管理员模式,便于首次使用
"api_server_port": "8080", // 默认API端口 "api_server_port": "8080", // 默认API端口
"use_default_coins": "true", // 默认使用内置币种列表 "use_default_coins": "true", // 默认使用内置币种列表
"default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表JSON格式 "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表JSON格式
"max_daily_loss": "10.0", // 最大日损失百分比 "max_daily_loss": "10.0", // 最大日损失百分比
"max_drawdown": "20.0", // 最大回撤百分比 "max_drawdown": "20.0", // 最大回撤百分比
"stop_trading_minutes": "60", // 停止交易时间(分钟) "stop_trading_minutes": "60", // 停止交易时间(分钟)
"btc_eth_leverage": "5", // BTC/ETH杠杆倍数 "btc_eth_leverage": "5", // BTC/ETH杠杆倍数
"altcoin_leverage": "5", // 山寨币杠杆倍数 "altcoin_leverage": "5", // 山寨币杠杆倍数
"jwt_secret": "", // JWT密钥默认为空由config.json或系统生成 "jwt_secret": "", // JWT密钥默认为空由config.json或系统生成
} }
for key, value := range systemConfigs { for key, value := range systemConfigs {
@@ -354,13 +358,13 @@ func (d *Database) migrateExchangesTable() error {
// User 用户配置 // User 用户配置
type User struct { type User struct {
ID string `json:"id"` ID string `json:"id"`
Email string `json:"email"` Email string `json:"email"`
PasswordHash string `json:"-"` // 不返回到前端 PasswordHash string `json:"-"` // 不返回到前端
OTPSecret string `json:"-"` // 不返回到前端 OTPSecret string `json:"-"` // 不返回到前端
OTPVerified bool `json:"otp_verified"` OTPVerified bool `json:"otp_verified"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"` UpdatedAt time.Time `json:"updated_at"`
} }
// AIModelConfig AI模型配置 // AIModelConfig AI模型配置
@@ -379,39 +383,40 @@ type AIModelConfig struct {
// ExchangeConfig 交易所配置 // ExchangeConfig 交易所配置
type ExchangeConfig struct { type ExchangeConfig struct {
ID string `json:"id"` ID string `json:"id"`
UserID string `json:"user_id"` UserID string `json:"user_id"`
Name string `json:"name"` Name string `json:"name"`
Type string `json:"type"` Type string `json:"type"`
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
APIKey string `json:"apiKey"` APIKey string `json:"apiKey"`
SecretKey string `json:"secretKey"` SecretKey string `json:"secretKey"`
Testnet bool `json:"testnet"` Testnet bool `json:"testnet"`
// Hyperliquid 特定字段 // Hyperliquid 特定字段
HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"` HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"`
// Aster 特定字段 // Aster 特定字段
AsterUser string `json:"asterUser"` AsterUser string `json:"asterUser"`
AsterSigner string `json:"asterSigner"` AsterSigner string `json:"asterSigner"`
AsterPrivateKey string `json:"asterPrivateKey"` AsterPrivateKey string `json:"asterPrivateKey"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"` UpdatedAt time.Time `json:"updated_at"`
} }
// TraderRecord 交易员配置(数据库实体) // TraderRecord 交易员配置(数据库实体)
type TraderRecord struct { type TraderRecord struct {
ID string `json:"id"` ID string `json:"id"`
UserID string `json:"user_id"` UserID string `json:"user_id"`
Name string `json:"name"` Name string `json:"name"`
AIModelID string `json:"ai_model_id"` AIModelID string `json:"ai_model_id"`
ExchangeID string `json:"exchange_id"` ExchangeID string `json:"exchange_id"`
InitialBalance float64 `json:"initial_balance"` InitialBalance float64 `json:"initial_balance"`
ScanIntervalMinutes int `json:"scan_interval_minutes"` ScanIntervalMinutes int `json:"scan_interval_minutes"`
IsRunning bool `json:"is_running"` IsRunning bool `json:"is_running"`
BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数
AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数
TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔 TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔
UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源 UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源
UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源 UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源
UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置评分信号源
CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt
OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt
SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称 SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称
@@ -422,12 +427,12 @@ type TraderRecord struct {
// UserSignalSource 用户信号源配置 // UserSignalSource 用户信号源配置
type UserSignalSource struct { type UserSignalSource struct {
ID int `json:"id"` ID int `json:"id"`
UserID string `json:"user_id"` UserID string `json:"user_id"`
CoinPoolURL string `json:"coin_pool_url"` CoinPoolURL string `json:"coin_pool_url"`
OITopURL string `json:"oi_top_url"` OITopURL string `json:"oi_top_url"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"` UpdatedAt time.Time `json:"updated_at"`
} }
// GenerateOTPSecret 生成OTP密钥 // GenerateOTPSecret 生成OTP密钥
@@ -767,9 +772,9 @@ func (d *Database) CreateExchange(userID, id, name, typ string, enabled bool, ap
// CreateTrader 创建交易员 // CreateTrader 创建交易员
func (d *Database) CreateTrader(trader *TraderRecord) error { func (d *Database) CreateTrader(trader *TraderRecord) error {
_, err := d.db.Exec(` _, err := d.db.Exec(`
INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, use_inside_coins, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?)
`, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.UseInsideCoins, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin)
return err return err
} }
@@ -779,7 +784,7 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) {
SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running,
COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage, COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage,
COALESCE(trading_symbols, '') as trading_symbols, COALESCE(trading_symbols, '') as trading_symbols,
COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top, COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,COALESCE(use_inside_coins, 0) as use_inside_coins,
COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt, COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt,
COALESCE(system_prompt_template, 'default') as system_prompt_template, COALESCE(system_prompt_template, 'default') as system_prompt_template,
COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at
@@ -790,14 +795,14 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) {
} }
defer rows.Close() defer rows.Close()
var traders []*TraderRecord var traders []*TraderRecord
for rows.Next() { for rows.Next() {
var trader TraderRecord var trader TraderRecord
err := rows.Scan( err := rows.Scan(
&trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID,
&trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning,
&trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols,
&trader.UseCoinPool, &trader.UseOITop, &trader.UseCoinPool, &trader.UseOITop, &trader.UseInsideCoins,
&trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate,
&trader.IsCrossMargin, &trader.IsCrossMargin,
&trader.CreatedAt, &trader.UpdatedAt, &trader.CreatedAt, &trader.UpdatedAt,
@@ -847,18 +852,13 @@ func (d *Database) DeleteTrader(userID, id string) error {
// GetTraderConfig 获取交易员完整配置包含AI模型和交易所信息 // GetTraderConfig 获取交易员完整配置包含AI模型和交易所信息
func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) { func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) {
var trader TraderRecord var trader TraderRecord
var aiModel AIModelConfig var aiModel AIModelConfig
var exchange ExchangeConfig var exchange ExchangeConfig
err := d.db.QueryRow(` err := d.db.QueryRow(`
SELECT SELECT
t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at,
COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage,
COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool,
COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt,
COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin,
t.created_at, t.updated_at,
a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at, a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at,
e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet, e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet,
COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr, COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr,
@@ -873,8 +873,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM
`, traderID, userID).Scan( `, traderID, userID).Scan(
&trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID,
&trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning,
&trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool,
&trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin,
&trader.CreatedAt, &trader.UpdatedAt, &trader.CreatedAt, &trader.UpdatedAt,
&aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey, &aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey,
&aiModel.CreatedAt, &aiModel.UpdatedAt, &aiModel.CreatedAt, &aiModel.UpdatedAt,
@@ -940,6 +938,35 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string)
return err return err
} }
// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies
func (d *Database) GetCustomCoins() []string {
var symbol string
var symbols []string
_ = d.db.QueryRow(`
SELECT GROUP_CONCAT(custom_coins , ',') as symbol
FROM main.traders where custom_coins != ''
`).Scan(&symbol)
// 检测用户是否未配置币种 - 兼容性
if symbol == "" {
symbolJSON, _ := d.GetSystemConfig("default_coins")
if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil {
log.Printf("⚠️ 解析default_coins配置失败: %v使用硬编码默认值", err)
symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"}
}
}
// filter Symbol
for _, s := range strings.Split(symbol, ",") {
if s == "" {
continue
}
coin := market.Normalize(s)
if !slices.Contains(symbols, coin) {
symbols = append(symbols, coin)
}
}
return symbols
}
// Close 关闭数据库连接 // Close 关闭数据库连接
func (d *Database) Close() error { func (d *Database) Close() error {
return d.db.Close() return d.db.Close()

29
main.go
View File

@@ -15,7 +15,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
"time"
) )
// LeverageConfig 杠杆配置 // LeverageConfig 杠杆配置
@@ -30,9 +29,9 @@ type ConfigFile struct {
APIServerPort int `json:"api_server_port"` APIServerPort int `json:"api_server_port"`
UseDefaultCoins bool `json:"use_default_coins"` UseDefaultCoins bool `json:"use_default_coins"`
DefaultCoins []string `json:"default_coins"` DefaultCoins []string `json:"default_coins"`
InsideCoins bool `json:"inside_coins"`
CoinPoolAPIURL string `json:"coin_pool_api_url"` CoinPoolAPIURL string `json:"coin_pool_api_url"`
OITopAPIURL string `json:"oi_top_api_url"` OITopAPIURL string `json:"oi_top_api_url"`
InsideCoins bool `json:"inside_coins"`
MaxDailyLoss float64 `json:"max_daily_loss"` MaxDailyLoss float64 `json:"max_daily_loss"`
MaxDrawdown float64 `json:"max_drawdown"` MaxDrawdown float64 `json:"max_drawdown"`
StopTradingMinutes int `json:"stop_trading_minutes"` StopTradingMinutes int `json:"stop_trading_minutes"`
@@ -68,9 +67,9 @@ func syncConfigToDatabase(database *config.Database) error {
"admin_mode": fmt.Sprintf("%t", configFile.AdminMode), "admin_mode": fmt.Sprintf("%t", configFile.AdminMode),
"api_server_port": strconv.Itoa(configFile.APIServerPort), "api_server_port": strconv.Itoa(configFile.APIServerPort),
"use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins),
"inside_coins": fmt.Sprintf("%t", configFile.InsideCoins),
"coin_pool_api_url": configFile.CoinPoolAPIURL, "coin_pool_api_url": configFile.CoinPoolAPIURL,
"oi_top_api_url": configFile.OITopAPIURL, "oi_top_api_url": configFile.OITopAPIURL,
"inside_coins": fmt.Sprintf("%t", configFile.InsideCoins),
"max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss),
"max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown),
"stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes),
@@ -137,8 +136,6 @@ func main() {
// 获取系统配置 // 获取系统配置
useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins")
useDefaultCoins := useDefaultCoinsStr == "true" useDefaultCoins := useDefaultCoinsStr == "true"
InsideCoinsStr, _ := database.GetSystemConfig("inside_coins")
insideCoins := InsideCoinsStr == "true"
apiPortStr, _ := database.GetSystemConfig("api_server_port") apiPortStr, _ := database.GetSystemConfig("api_server_port")
// 获取管理员模式配置 // 获取管理员模式配置
@@ -186,26 +183,6 @@ func main() {
} }
pool.SetDefaultCoins(defaultCoins) pool.SetDefaultCoins(defaultCoins)
//内置AI评分
if insideCoins {
log.Printf("✓ 启用内置AI评分币种列表")
monitor := market.NewWSMonitor(150)
go func() {
monitor.Start()
// 定时器设置默认的币种列表 - 覆蓋defaultCoins设置
for {
if len(monitor.FilterSymbol) > 0 {
for _, coin := range defaultCoins {
monitor.FilterSymbol = append(monitor.FilterSymbol, coin)
}
pool.SetDefaultCoins(monitor.FilterSymbol)
monitor.FilterSymbol = nil
}
time.Sleep(1 * time.Minute)
}
}()
}
// 设置是否使用默认主流币种 // 设置是否使用默认主流币种
pool.SetUseDefaultCoins(useDefaultCoins) pool.SetUseDefaultCoins(useDefaultCoins)
if useDefaultCoins { if useDefaultCoins {
@@ -286,6 +263,8 @@ func main() {
} }
}() }()
// 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认
go market.NewWSMonitor(150).Start(database.GetCustomCoins())
// 设置优雅退出 // 设置优雅退出
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

View File

@@ -35,6 +35,7 @@ type SymbolStats struct {
} }
var WSMonitorCli *WSMonitor var WSMonitorCli *WSMonitor
var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期
func NewWSMonitor(batchSize int) *WSMonitor { func NewWSMonitor(batchSize int) *WSMonitor {
WSMonitorCli = &WSMonitor{ WSMonitorCli = &WSMonitor{
@@ -47,23 +48,27 @@ func NewWSMonitor(batchSize int) *WSMonitor {
return WSMonitorCli return WSMonitorCli
} }
func (m *WSMonitor) Initialize() error { func (m *WSMonitor) Initialize(coins []string) error {
log.Println("初始化WebSocket监控器...") log.Println("初始化WebSocket监控器...")
// 获取交易对信息 // 获取交易对信息
apiClient := NewAPIClient() apiClient := NewAPIClient()
exchangeInfo, err := apiClient.GetExchangeInfo() // 如果不指定交易对则使用market市场的所有交易对币种
if err != nil { if len(coins) == 0 {
return err exchangeInfo, err := apiClient.GetExchangeInfo()
if err != nil {
return err
}
// 筛选永续合约交易对 --仅测试时使用
//exchangeInfo.Symbols = exchangeInfo.Symbols[0:2]
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" {
m.symbols = append(m.symbols, symbol.Symbol)
}
}
} else {
m.symbols = coins
} }
// 筛选永续合约交易对 --仅测试时使用
//exchangeInfo.Symbols = exchangeInfo.Symbols[0:2]
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" {
m.symbols = append(m.symbols, Normalize(symbol.Symbol))
}
}
log.Printf("找到 %d 个交易对", len(m.symbols)) log.Printf("找到 %d 个交易对", len(m.symbols))
// 初始化历史数据 // 初始化历史数据
if err := m.initializeHistoricalData(); err != nil { if err := m.initializeHistoricalData(); err != nil {
@@ -114,10 +119,10 @@ func (m *WSMonitor) initializeHistoricalData() error {
return nil return nil
} }
func (m *WSMonitor) Start() { func (m *WSMonitor) Start(coins []string) {
log.Printf("启动WebSocket实时监控...") log.Printf("启动WebSocket实时监控...")
// 初始化交易对 // 初始化交易对
err := m.Initialize() err := m.Initialize(coins)
if err != nil { if err != nil {
log.Fatalf("❌ 初始化币种: %v", err) log.Fatalf("❌ 初始化币种: %v", err)
return return
@@ -129,42 +134,43 @@ func (m *WSMonitor) Start() {
return return
} }
// 启动警报处理器 // 启动警报处理器
go m.handleAlerts() //go m.handleAlerts()
// 启动定期清理任务 // 启动定期清理任务
go m.cleanupInactiveSymbols() //go m.cleanupInactiveSymbols()
// 输出监控统计 - 评分前十名 // 输出监控统计 - 评分前十名
go m.printFilterStats(50) //go m.printFilterStats(20)
// 订阅所有交易对 // 订阅所有交易对
err = m.subscribeAll() err = m.subscribeAll()
if err != nil { if err != nil {
log.Fatalf("❌ 订阅币种交易对: %v", err) log.Fatalf("❌ 订阅币种交易对: %v", err)
return return
} }
} }
// subscribeSymbol 注册监听
func (m *WSMonitor) subscribeSymbol(symbol, st string) []string {
var streams []string
stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st)
ch := m.combinedClient.AddSubscriber(stream, 100)
streams = append(streams, stream)
go m.handleKlineData(symbol, ch, st)
return streams
}
func (m *WSMonitor) subscribeAll() error { func (m *WSMonitor) subscribeAll() error {
// 执行批量订阅 // 执行批量订阅
log.Println("开始订阅所有交易对...") log.Println("开始订阅所有交易对...")
for _, symbol := range m.symbols { for _, symbol := range m.symbols {
stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) for _, st := range subKlineTime {
ch3m := m.combinedClient.AddSubscriber(stream3m, 100) m.subscribeSymbol(symbol, st)
go m.handleKlineData(symbol, ch3m, "3m") }
stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol))
ch4h := m.combinedClient.AddSubscriber(stream4h, 100)
go m.handleKlineData(symbol, ch4h, "4h")
} }
for _, st := range subKlineTime {
err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") err := m.combinedClient.BatchSubscribeKlines(m.symbols, st)
if err != nil { if err != nil {
log.Fatalf("❌ 订阅3m K线: %v", err) log.Fatalf("❌ 订阅3m K线: %v", err)
return err return err
} }
err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h")
if err != nil {
log.Fatalf("❌ 订阅4h K线: %v", err)
return err
} }
log.Println("所有交易对订阅完成") log.Println("所有交易对订阅完成")
return nil return nil
@@ -181,34 +187,14 @@ func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time strin
} }
} }
func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) {
for data := range ch {
var tickerData TickerWSData
if err := json.Unmarshal(data, &tickerData); err != nil {
log.Printf("解析Ticker数据失败: %v", err)
continue
}
m.processTickerUpdate(symbol, tickerData)
}
}
func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) {
for data := range ch {
var tickerData []TickerWSData
if err := json.Unmarshal(data, &tickerData); err != nil {
log.Printf("解析Ticker数据失败: %v", err)
continue
}
log.Fatalln(tickerData)
//m.processTickerUpdate(symbol, tickerData)
}
}
func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map {
var klineDataMap *sync.Map var klineDataMap *sync.Map
if _time == "3m" { if _time == "3m" {
klineDataMap = &m.klineDataMap3m klineDataMap = &m.klineDataMap3m
} else { } else if _time == "4h" {
klineDataMap = &m.klineDataMap4h klineDataMap = &m.klineDataMap4h
} else {
klineDataMap = &sync.Map{}
} }
return klineDataMap return klineDataMap
} }
@@ -310,11 +296,19 @@ func (m *WSMonitor) handleAlerts() {
} }
func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) {
// 对每一个进来的symbol检测是否存在内类 是否的话就订阅它
value, exists := m.getKlineDataMap(_time).Load(symbol) value, exists := m.getKlineDataMap(_time).Load(symbol)
if !exists { if !exists {
// 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行)
apiClient := NewAPIClient() apiClient := NewAPIClient()
klines, err := apiClient.GetKlines(symbol, _time, 40) klines, err := apiClient.GetKlines(symbol, _time, 100)
m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存
subStr := m.subscribeSymbol(symbol, _time)
subErr := m.combinedClient.subscribeStreams(subStr)
log.Printf("动态订阅流: %v", subStr)
if subErr != nil {
return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr)
}
if err != nil { if err != nil {
return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err)
} }