修改Kline获取方式为Websocket缓存。

This commit is contained in:
yuanshi2016
2025-11-02 17:59:19 +08:00
parent 7302f96e8e
commit d8582475d3
8 changed files with 21 additions and 495 deletions

View File

@@ -5,7 +5,6 @@
"altcoin_leverage": 5
},
"use_default_coins": true,
"inside_coins": true,
"default_coins": [
"BTCUSDT",
"ETHUSDT",

View File

@@ -54,7 +54,6 @@ type LeverageConfig struct {
type Config struct {
Traders []TraderConfig `json:"traders"`
UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表
UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表
DefaultCoins []string `json:"default_coins"` // 默认主流币种池
APIServerPort int `json:"api_server_port"`
MaxDailyLoss float64 `json:"max_daily_loss"`

View File

@@ -51,7 +51,12 @@ nofx/
├── market/ # Market data fetching
│ └── data.go # Market data & technical indicators (TA-Lib)
└── api_client.go # Market data acquisition API
│ └── websocket_client.go # Market data acquisition WebSocket interface
│ └── combined_streams.go # Market data acquisition: Combined streaming (single link to subscribe to multiple cryptocurrencies)
│ └── monitor.go # Market data cache
│ └── types.go # market structure
├── pool/ # Coin pool management
│ └── coin_pool.go # AI500 + OI Top merged pool

View File

@@ -51,6 +51,11 @@ nofx/
├── market/ # 市场数据获取
│ └── data.go # 市场数据与技术指标TA-Lib
│ └── api_client.go # 行情获取 Api接口
│ └── websocket_client.go # 行情获取 Websocket接口
│ └── combined_streams.go # 行情获取 组合流式(单链接订阅多个币种)
│ └── monitor.go # 行情数据缓存
│ └── types.go # market结构体
├── pool/ # 币种池管理
│ └── coin_pool.go # AI500 + OI Top 合并池

10
go.mod
View File

@@ -8,7 +8,8 @@ require (
github.com/gin-gonic/gin v1.11.0
github.com/golang-jwt/jwt/v5 v5.2.0
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.32
github.com/gorilla/websocket v1.5.3
github.com/mattn/go-sqlite3 v1.14.16
github.com/pquerna/otp v1.4.0
github.com/sonirico/go-hyperliquid v0.17.0
golang.org/x/crypto v0.42.0
@@ -26,6 +27,7 @@ require (
github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/elastic/go-sysinfo v1.15.4 // indirect
github.com/elastic/go-windows v1.0.2 // indirect
github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect
@@ -37,7 +39,6 @@ require (
github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
@@ -55,6 +56,7 @@ require (
github.com/prometheus/procfs v0.17.0 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rs/zerolog v1.34.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sonirico/vago v0.9.0 // indirect
@@ -78,4 +80,8 @@ require (
golang.org/x/tools v0.36.0 // indirect
google.golang.org/protobuf v1.36.9 // indirect
howett.net/plist v1.0.1 // indirect
modernc.org/libc v1.37.6 // indirect
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.7.2 // indirect
modernc.org/sqlite v1.28.0 // indirect
)

View File

@@ -265,6 +265,7 @@ func main() {
// 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认
go market.NewWSMonitor(150).Start(database.GetCustomCoins())
//go market.NewWSMonitor(150).Start([]string{}) //这里是一个使用方式 传入空的话 则使用market市场的所有币种
// 设置优雅退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

View File

@@ -1,229 +0,0 @@
package market
import (
"fmt"
"math"
"time"
)
type FeatureEngine struct {
alertThresholds AlertThresholds
}
func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine {
return &FeatureEngine{
alertThresholds: thresholds,
}
}
func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures {
if len(klines) < 20 {
return nil
}
features := &SymbolFeatures{
Symbol: symbol,
Timestamp: time.Now(),
}
// 提取价格和交易量数据
closes := make([]float64, len(klines))
volumes := make([]float64, len(klines))
highs := make([]float64, len(klines))
lows := make([]float64, len(klines))
for i, k := range klines {
closes[i] = k.Close
volumes[i] = k.Volume
highs[i] = k.High
lows[i] = k.Low
}
// 价格特征
features.Price = closes[len(closes)-1]
features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2]
if len(closes) >= 5 {
features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5]
}
if len(closes) >= 17 {
features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17]
}
// 交易量特征
currentVolume := volumes[len(volumes)-1]
features.Volume = currentVolume
// 5周期平均交易量
if len(volumes) >= 6 {
avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1])
features.VolumeRatio5 = currentVolume / avgVolume5
}
// 20周期平均交易量
if len(volumes) >= 21 {
avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1])
features.VolumeRatio20 = currentVolume / avgVolume20
}
// 交易量趋势
if features.VolumeRatio20 > 0 {
features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20
}
// 技术指标
features.RSI14 = e.calculateRSI(closes, 14)
features.SMA5 = e.calculateSMA(closes, 5)
features.SMA10 = e.calculateSMA(closes, 10)
features.SMA20 = e.calculateSMA(closes, 20)
// 波动特征
currentHigh := highs[len(highs)-1]
currentLow := lows[len(lows)-1]
features.HighLowRatio = (currentHigh - currentLow) / features.Price
features.Volatility20 = e.calculateVolatility(closes, 20)
// 价格在区间中的位置
if currentHigh != currentLow {
features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow)
} else {
features.PositionInRange = 0.5
}
return features
}
func (e *FeatureEngine) calculateAverage(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 {
if len(prices) < period {
return 0
}
return e.calculateAverage(prices[len(prices)-period:])
}
func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 {
if len(prices) <= period {
return 50
}
gains := make([]float64, 0)
losses := make([]float64, 0)
for i := 1; i < len(prices); i++ {
change := prices[i] - prices[i-1]
if change > 0 {
gains = append(gains, change)
losses = append(losses, 0)
} else {
gains = append(gains, 0)
losses = append(losses, -change)
}
}
// 只取最近period个数据点
if len(gains) > period {
gains = gains[len(gains)-period:]
losses = losses[len(losses)-period:]
}
avgGain := e.calculateAverage(gains)
avgLoss := e.calculateAverage(losses)
if avgLoss == 0 {
return 100
}
rs := avgGain / avgLoss
return 100 - (100 / (1 + rs))
}
func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 {
if len(prices) < period {
return 0
}
periodPrices := prices[len(prices)-period:]
mean := e.calculateAverage(periodPrices)
variance := 0.0
for _, price := range periodPrices {
variance += math.Pow(price-mean, 2)
}
variance /= float64(len(periodPrices))
return math.Sqrt(variance) / mean
}
func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert {
var alerts []Alert
// 交易量放大检测
if features.VolumeRatio5 > e.alertThresholds.VolumeSpike {
alerts = append(alerts, Alert{
Type: "VOLUME_SPIKE",
Symbol: features.Symbol,
Value: features.VolumeRatio5,
Threshold: e.alertThresholds.VolumeSpike,
Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5),
Timestamp: time.Now(),
})
}
// 15分钟价格异动
if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min {
direction := "上涨"
if features.PriceChange15Min < 0 {
direction = "下跌"
}
alerts = append(alerts, Alert{
Type: "PRICE_CHANGE_15MIN",
Symbol: features.Symbol,
Value: features.PriceChange15Min,
Threshold: e.alertThresholds.PriceChange15Min,
Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100),
Timestamp: time.Now(),
})
}
// 交易量趋势
if features.VolumeTrend > e.alertThresholds.VolumeTrend {
alerts = append(alerts, Alert{
Type: "VOLUME_TREND",
Symbol: features.Symbol,
Value: features.VolumeTrend,
Threshold: e.alertThresholds.VolumeTrend,
Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend),
Timestamp: time.Now(),
})
}
// RSI超买超卖
if features.RSI14 > e.alertThresholds.RSIOverbought {
alerts = append(alerts, Alert{
Type: "RSI_OVERBOUGHT",
Symbol: features.Symbol,
Value: features.RSI14,
Threshold: e.alertThresholds.RSIOverbought,
Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14),
Timestamp: time.Now(),
})
} else if features.RSI14 < e.alertThresholds.RSIOversold {
alerts = append(alerts, Alert{
Type: "RSI_OVERSOLD",
Symbol: features.Symbol,
Value: features.RSI14,
Threshold: e.alertThresholds.RSIOversold,
Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14),
Timestamp: time.Now(),
})
}
return alerts
}

View File

@@ -4,8 +4,6 @@ import (
"encoding/json"
"fmt"
"log"
"math"
"sort"
"strings"
"sync"
"time"
@@ -14,7 +12,6 @@ import (
type WSMonitor struct {
wsClient *WSClient
combinedClient *CombinedStreamsClient
featureEngine *FeatureEngine
symbols []string
featuresMap sync.Map
alertsChan chan Alert
@@ -41,7 +38,6 @@ func NewWSMonitor(batchSize int) *WSMonitor {
WSMonitorCli = &WSMonitor{
wsClient: NewWSClient(),
combinedClient: NewCombinedStreamsClient(batchSize),
featureEngine: NewFeatureEngine(config.AlertThresholds),
alertsChan: make(chan Alert, 1000),
batchSize: batchSize,
}
@@ -63,6 +59,7 @@ func (m *WSMonitor) Initialize(coins []string) error {
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" {
m.symbols = append(m.symbols, symbol.Symbol)
m.filterSymbols.Store(symbol.Symbol, true)
}
}
} else {
@@ -133,12 +130,6 @@ func (m *WSMonitor) Start(coins []string) {
log.Fatalf("❌ 批量订阅流: %v", err)
return
}
// 启动警报处理器
//go m.handleAlerts()
// 启动定期清理任务
//go m.cleanupInactiveSymbols()
// 输出监控统计 - 评分前十名
//go m.printFilterStats(20)
// 订阅所有交易对
err = m.subscribeAll()
if err != nil {
@@ -239,60 +230,6 @@ func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time
}
klineDataMap.Store(symbol, klines)
// 计算特征并检测警报
if len(klines) >= 20 {
features := m.featureEngine.CalculateFeatures(symbol, klines)
if features != nil {
m.featuresMap.Store(symbol, features)
alerts := m.featureEngine.DetectAlerts(features)
hasAlert := len(alerts) > 0
// 更新统计信息
m.updateSymbolStats(symbol, features, hasAlert)
for _, alert := range alerts {
m.alertsChan <- alert
}
// 实时日志输出重要特征
if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 {
//log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f",
// symbol, features.Price, features.PriceChange15Min*100,
// features.VolumeRatio5, features.RSI14)
}
}
}
}
func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) {
// 存储ticker数据
m.tickerDataMap.Store(symbol, tickerData)
}
func (m *WSMonitor) handleAlerts() {
alertCounts := make(map[string]int)
lastReset := time.Now()
for alert := range m.alertsChan {
// 重置计数器(每小时)
if time.Since(lastReset) > time.Hour {
alertCounts = make(map[string]int)
lastReset = time.Now()
}
// 警报去重和频率控制
alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type)
alertCounts[alertKey]++
m.filterSymbols.Store(alert.Symbol, true)
//log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message)
if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报
//log.Printf("🚨 实时警报: %s", alert.Message)
// 这里可以添加其他警报处理逻辑
}
}
}
func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) {
@@ -317,204 +254,7 @@ func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, erro
return value.([]Kline), nil
}
func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) {
value, exists := m.featuresMap.Load(symbol)
if !exists {
return nil, false
}
return value.(*SymbolFeatures), true
}
func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures {
features := make(map[string]*SymbolFeatures)
m.featuresMap.Range(func(key, value interface{}) bool {
features[key.(string)] = value.(*SymbolFeatures)
return true
})
return features
}
func (m *WSMonitor) Close() {
m.wsClient.Close()
close(m.alertsChan)
}
func (m *WSMonitor) printFilterStats(nember int) {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for range ticker.C {
var monitoredSymbols []string
m.filterSymbols.Range(func(key, value interface{}) bool {
monitoredSymbols = append(monitoredSymbols, key.(string))
return true
})
log.Printf("🎯 监控统计 - 总数: %d, 币种: %v",
len(monitoredSymbols), monitoredSymbols)
// 打印前5个评分最高的币种
type symbolScore struct {
symbol string
score float64
}
var topScores []symbolScore
m.symbolStats.Range(func(key, value interface{}) bool {
symbol := key.(string)
stats := value.(*SymbolStats)
topScores = append(topScores, symbolScore{symbol, stats.Score})
return true
})
// 按评分排序
sort.Slice(topScores, func(i, j int) bool {
return topScores[i].score > topScores[j].score
})
m.FilterSymbol = nil
if len(topScores) > 0 {
log.Printf("🏆 评分TOP%v:", nember)
for i := 0; i < len(topScores) && i < nember; i++ {
m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol)
log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score)
}
}
}
}
// evaluateSymbolScore 评估币种得分,决定是否保留
func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 {
score := 0.0
// 交易量活跃度评分 (权重: 40%)
if features.VolumeRatio5 > 1.5 {
score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0)
}
// 价格波动评分 (权重: 30%)
volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数
score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分
// RSI活跃度评分 (权重: 20%)
if features.RSI14 < 30 || features.RSI14 > 70 {
score += 20 // RSI在极端区域
} else if features.RSI14 < 40 || features.RSI14 > 60 {
score += 10 // RSI在活跃区域
}
// 交易量趋势评分 (权重: 10%)
if features.VolumeTrend > 1.2 {
score += 10 * math.Min(features.VolumeTrend/3.0, 1.0)
}
return score
}
// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除
func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool {
value, exists := m.symbolStats.Load(symbol)
if !exists {
return true // 没有统计信息,移除
}
stats := value.(*SymbolStats)
// 规则1: 超过30分钟没有活跃迹象
if time.Since(stats.LastActiveTime) > 30*time.Minute {
log.Printf("🔻 %s 因长时间不活跃被移除", symbol)
return true
}
// 规则2: 评分持续低于阈值 (最近5次评分平均)
if stats.Score < 15 { // 调整这个阈值
log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score)
return true
}
// 规则3: 超过2小时没有产生警报
if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 {
log.Printf("🔻 %s 因长时间无新警报被移除", symbol)
return true
}
return false
}
// updateSymbolStats 更新币种统计信息
func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) {
now := time.Now()
value, exists := m.symbolStats.Load(symbol)
var stats *SymbolStats
if !exists {
stats = &SymbolStats{
LastActiveTime: now,
Score: m.evaluateSymbolScore(symbol, features),
}
} else {
stats = value.(*SymbolStats)
stats.LastActiveTime = now
// 平滑更新评分 (指数移动平均)
newScore := m.evaluateSymbolScore(symbol, features)
stats.Score = 0.7*stats.Score + 0.3*newScore
}
if hasAlert {
stats.AlertCount++
stats.LastAlertTime = now
}
if features.VolumeRatio5 > 2.0 {
stats.VolumeSpikeCount++
}
m.symbolStats.Store(symbol, stats)
}
// removeFromFilter 从FilterSymbols中移除币种
func (m *WSMonitor) removeFromFilter(symbol string) {
// 从filterSymbols中移除
m.filterSymbols.Delete(symbol)
m.symbolStats.Delete(symbol)
log.Printf("🗑️ 已移除币种监控: %s", symbol)
}
// cleanupInactiveSymbols 定期清理不活跃的币种
func (m *WSMonitor) cleanupInactiveSymbols() {
ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次
defer ticker.Stop()
for range ticker.C {
var symbolsToRemove []string
// 收集需要移除的币种
m.filterSymbols.Range(func(key, value interface{}) bool {
symbol := key.(string)
if m.shouldRemoveFromFilter(symbol) {
symbolsToRemove = append(symbolsToRemove, symbol)
}
return true
})
// 执行移除操作
for _, symbol := range symbolsToRemove {
m.removeFromFilter(symbol)
}
if len(symbolsToRemove) > 0 {
log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove))
}
}
}
// getSymbolScore 获取币种当前评分
func (m *WSMonitor) getSymbolScore(symbol string) float64 {
value, exists := m.symbolStats.Load(symbol)
if !exists {
return 0
}
return value.(*SymbolStats).Score
}