feat: migrate to CoinAnk API and improve chart UI

- Chart improvements: professional styling, popular symbols quick selection, simplified B/S legend
- Data source migration: use CoinAnk API exclusively for all kline data
- Code cleanup: remove Binance WebSocket cache and related code (websocket_client.go, combined_streams.go, monitor.go)
- Log optimization: reduce hook spam, suppress 404 errors, increase P&L diff threshold
- Lighter integration: add order sync functionality, fix market order precision
- Remove ticker merge logic for simplicity
This commit is contained in:
tinkle-community
2025-12-26 00:58:12 +08:00
parent 54b24167a7
commit 1744e7f38e
38 changed files with 6498 additions and 964 deletions

View File

@@ -1,202 +0,0 @@
package market
import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
type CombinedStreamsClient struct {
conn *websocket.Conn
mu sync.RWMutex
subscribers map[string]chan []byte
reconnect bool
done chan struct{}
batchSize int // Number of streams per batch subscription
}
func NewCombinedStreamsClient(batchSize int) *CombinedStreamsClient {
return &CombinedStreamsClient{
subscribers: make(map[string]chan []byte),
reconnect: true,
done: make(chan struct{}),
batchSize: batchSize,
}
}
func (c *CombinedStreamsClient) Connect() error {
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
// Combined streams use a different endpoint
conn, _, err := dialer.Dial("wss://fstream.binance.com/stream", nil)
if err != nil {
return fmt.Errorf("Combined stream WebSocket connection failed: %v", err)
}
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
log.Println("Combined stream WebSocket connected successfully")
go c.readMessages()
return nil
}
// BatchSubscribeKlines subscribes to K-lines in batches
func (c *CombinedStreamsClient) BatchSubscribeKlines(symbols []string, interval string) error {
// Split symbols into batches
batches := c.splitIntoBatches(symbols, c.batchSize)
for i, batch := range batches {
log.Printf("Subscribing batch %d, count: %d", i+1, len(batch))
streams := make([]string, len(batch))
for j, symbol := range batch {
streams[j] = fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), interval)
}
if err := c.subscribeStreams(streams); err != nil {
return fmt.Errorf("Batch %d subscription failed: %v", i+1, err)
}
// Delay between batches to avoid rate limiting
if i < len(batches)-1 {
time.Sleep(100 * time.Millisecond)
}
}
return nil
}
// splitIntoBatches splits a slice into batches of specified size
func (c *CombinedStreamsClient) splitIntoBatches(symbols []string, batchSize int) [][]string {
var batches [][]string
for i := 0; i < len(symbols); i += batchSize {
end := i + batchSize
if end > len(symbols) {
end = len(symbols)
}
batches = append(batches, symbols[i:end])
}
return batches
}
// subscribeStreams subscribes to multiple streams
func (c *CombinedStreamsClient) subscribeStreams(streams []string) error {
subscribeMsg := map[string]interface{}{
"method": "SUBSCRIBE",
"params": streams,
"id": time.Now().UnixNano(),
}
c.mu.RLock()
defer c.mu.RUnlock()
if c.conn == nil {
return fmt.Errorf("WebSocket not connected")
}
log.Printf("Subscribing to streams: %v", streams)
return c.conn.WriteJSON(subscribeMsg)
}
func (c *CombinedStreamsClient) readMessages() {
for {
select {
case <-c.done:
return
default:
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
time.Sleep(1 * time.Second)
continue
}
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("Failed to read combined stream message: %v", err)
c.handleReconnect()
return
}
c.handleCombinedMessage(message)
}
}
}
func (c *CombinedStreamsClient) handleCombinedMessage(message []byte) {
var combinedMsg struct {
Stream string `json:"stream"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal(message, &combinedMsg); err != nil {
log.Printf("Failed to parse combined message: %v", err)
return
}
c.mu.RLock()
ch, exists := c.subscribers[combinedMsg.Stream]
c.mu.RUnlock()
if exists {
select {
case ch <- combinedMsg.Data:
default:
log.Printf("Subscriber channel is full: %s", combinedMsg.Stream)
}
}
}
func (c *CombinedStreamsClient) AddSubscriber(stream string, bufferSize int) <-chan []byte {
ch := make(chan []byte, bufferSize)
c.mu.Lock()
c.subscribers[stream] = ch
c.mu.Unlock()
return ch
}
func (c *CombinedStreamsClient) handleReconnect() {
if !c.reconnect {
return
}
log.Println("Combined stream attempting to reconnect...")
time.Sleep(3 * time.Second)
if err := c.Connect(); err != nil {
log.Printf("Combined stream reconnection failed: %v", err)
go c.handleReconnect()
}
}
func (c *CombinedStreamsClient) Close() {
c.reconnect = false
close(c.done)
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
for stream, ch := range c.subscribers {
close(ch)
delete(c.subscribers, stream)
}
}

View File

@@ -1,10 +1,13 @@
package market
import (
"context"
"encoding/json"
"fmt"
"io"
"nofx/logger"
"nofx/provider/coinank"
"nofx/provider/coinank/coinank_enum"
"math"
"strconv"
"strings"
@@ -22,18 +25,86 @@ type FundingRateCache struct {
var (
fundingRateMap sync.Map // map[string]*FundingRateCache
frCacheTTL = 1 * time.Hour
coinankClient *coinank.CoinankClient // Global CoinAnk client for kline data
)
// Initialize CoinAnk client
func init() {
coinankClient = coinank.NewCoinankClient(coinank_enum.MainUrl, "0cccbd7992754b67b1848c6746c0fce0")
}
// getKlinesFromCoinAnk fetches kline data from CoinAnk API (replacement for WSMonitorCli)
func getKlinesFromCoinAnk(symbol, interval string, limit int) ([]Kline, error) {
// Map interval string to coinank enum
var coinankInterval coinank_enum.Interval
switch interval {
case "1m":
coinankInterval = coinank_enum.Minute1
case "3m":
coinankInterval = coinank_enum.Minute3
case "5m":
coinankInterval = coinank_enum.Minute5
case "15m":
coinankInterval = coinank_enum.Minute15
case "30m":
coinankInterval = coinank_enum.Minute30
case "1h":
coinankInterval = coinank_enum.Hour1
case "2h":
coinankInterval = coinank_enum.Hour2
case "4h":
coinankInterval = coinank_enum.Hour4
case "6h":
coinankInterval = coinank_enum.Hour6
case "8h":
coinankInterval = coinank_enum.Hour8
case "12h":
coinankInterval = coinank_enum.Hour12
case "1d":
coinankInterval = coinank_enum.Day1
case "3d":
coinankInterval = coinank_enum.Day3
case "1w":
coinankInterval = coinank_enum.Week1
default:
return nil, fmt.Errorf("unsupported interval: %s", interval)
}
// Call CoinAnk API (default to Binance exchange for compatibility)
ctx := context.Background()
endTime := time.Now().UnixMilli()
coinankKlines, err := coinankClient.Kline(ctx, symbol, coinank_enum.Binance, 0, endTime, limit, coinankInterval)
if err != nil {
return nil, fmt.Errorf("CoinAnk API error: %w", err)
}
// Convert coinank kline format to market.Kline format
klines := make([]Kline, len(coinankKlines))
for i, ck := range coinankKlines {
klines[i] = Kline{
OpenTime: ck.StartTime,
Open: ck.Open,
High: ck.High,
Low: ck.Low,
Close: ck.Close,
Volume: ck.Volume,
CloseTime: ck.EndTime,
}
}
return klines, nil
}
// Get retrieves market data for the specified token
func Get(symbol string) (*Data, error) {
var klines3m, klines4h []Kline
var err error
// Normalize symbol
symbol = Normalize(symbol)
// Get 3-minute K-line data (latest 10)
klines3m, err = WSMonitorCli.GetCurrentKlines(symbol, "3m") // Get more for calculation
// Get 3-minute K-line data from CoinAnk (get 100 for calculation)
klines3m, err = getKlinesFromCoinAnk(symbol, "3m", 100)
if err != nil {
return nil, fmt.Errorf("Failed to get 3-minute K-line: %v", err)
return nil, fmt.Errorf("Failed to get 3-minute K-line from CoinAnk: %v", err)
}
// Data staleness detection: Prevent DOGEUSDT-style price freeze issues
@@ -42,10 +113,10 @@ func Get(symbol string) (*Data, error) {
return nil, fmt.Errorf("%s data is stale, possible cache failure", symbol)
}
// Get 4-hour K-line data (latest 10)
klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // Get more for indicator calculation
// Get 4-hour K-line data from CoinAnk (get 100 for indicator calculation)
klines4h, err = getKlinesFromCoinAnk(symbol, "4h", 100)
if err != nil {
return nil, fmt.Errorf("Failed to get 4-hour K-line: %v", err)
return nil, fmt.Errorf("Failed to get 4-hour K-line from CoinAnk: %v", err)
}
// Check if data is empty
@@ -144,11 +215,11 @@ func GetWithTimeframes(symbol string, timeframes []string, primaryTimeframe stri
timeframeData := make(map[string]*TimeframeSeriesData)
var primaryKlines []Kline
// Get K-line data for each timeframe
// Get K-line data for each timeframe from CoinAnk
for _, tf := range timeframes {
klines, err := WSMonitorCli.GetCurrentKlines(symbol, tf)
klines, err := getKlinesFromCoinAnk(symbol, tf, 200) // Get enough data for indicators
if err != nil {
logger.Infof("⚠️ Failed to get %s %s K-line: %v", symbol, tf, err)
logger.Infof("⚠️ Failed to get %s %s K-line from CoinAnk: %v", symbol, tf, err)
continue
}

View File

@@ -1,273 +0,0 @@
package market
import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
)
type WSMonitor struct {
wsClient *WSClient
combinedClient *CombinedStreamsClient
symbols []string
featuresMap sync.Map
alertsChan chan Alert
klineDataMap3m sync.Map // Store K-line historical data for each trading pair
klineDataMap4h sync.Map // Store K-line historical data for each trading pair
tickerDataMap sync.Map // Store ticker data for each trading pair
batchSize int
filterSymbols sync.Map // Use sync.Map to store monitored coins and their status
symbolStats sync.Map // Store symbol statistics
FilterSymbol []string // Filtered symbols
}
type SymbolStats struct {
LastActiveTime time.Time
AlertCount int
VolumeSpikeCount int
LastAlertTime time.Time
Score float64 // Composite score
}
var WSMonitorCli *WSMonitor
var subKlineTime = []string{"3m", "4h"} // Manage K-line periods for subscription streams
func NewWSMonitor(batchSize int) *WSMonitor {
WSMonitorCli = &WSMonitor{
wsClient: NewWSClient(),
combinedClient: NewCombinedStreamsClient(batchSize),
alertsChan: make(chan Alert, 1000),
batchSize: batchSize,
}
return WSMonitorCli
}
func (m *WSMonitor) Initialize(coins []string) error {
log.Println("Initializing WebSocket monitor...")
// Get trading pair information
apiClient := NewAPIClient()
// If trading pairs are not specified, use all trading pairs from the market
if len(coins) == 0 {
exchangeInfo, err := apiClient.GetExchangeInfo()
if err != nil {
return err
}
// Filter perpetual contract trading pairs -- only use for testing
//exchangeInfo.Symbols = exchangeInfo.Symbols[0:2]
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" {
m.symbols = append(m.symbols, symbol.Symbol)
m.filterSymbols.Store(symbol.Symbol, true)
}
}
} else {
m.symbols = coins
}
log.Printf("Found %d trading pairs", len(m.symbols))
// Initialize historical data
if err := m.initializeHistoricalData(); err != nil {
log.Printf("Failed to initialize historical data: %v", err)
}
return nil
}
func (m *WSMonitor) initializeHistoricalData() error {
apiClient := NewAPIClient()
var wg sync.WaitGroup
semaphore := make(chan struct{}, 5) // Limit concurrency
for _, symbol := range m.symbols {
wg.Add(1)
semaphore <- struct{}{}
go func(s string) {
defer wg.Done()
defer func() { <-semaphore }()
// Get historical K-line data
klines, err := apiClient.GetKlines(s, "3m", 100)
if err != nil {
log.Printf("Failed to get %s historical data: %v", s, err)
return
}
if len(klines) > 0 {
m.klineDataMap3m.Store(s, klines)
log.Printf("Loaded %s historical K-line data-3m: %d entries", s, len(klines))
}
// Get historical K-line data
klines4h, err := apiClient.GetKlines(s, "4h", 100)
if err != nil {
log.Printf("Failed to get %s historical data: %v", s, err)
return
}
if len(klines4h) > 0 {
m.klineDataMap4h.Store(s, klines4h)
log.Printf("Loaded %s historical K-line data-4h: %d entries", s, len(klines4h))
}
}(symbol)
}
wg.Wait()
return nil
}
func (m *WSMonitor) Start(coins []string) {
log.Printf("Starting WebSocket real-time monitoring...")
// Initialize trading pairs
err := m.Initialize(coins)
if err != nil {
log.Printf("❌ Failed to initialize coins: %v", err)
return
}
err = m.combinedClient.Connect()
if err != nil {
log.Printf("❌ Failed to batch subscribe to streams: %v", err)
return
}
// Subscribe to all trading pairs
err = m.subscribeAll()
if err != nil {
log.Printf("❌ Failed to subscribe to coin trading pairs: %v", err)
return
}
}
// subscribeSymbol registers listener
func (m *WSMonitor) subscribeSymbol(symbol, st string) []string {
var streams []string
stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st)
ch := m.combinedClient.AddSubscriber(stream, 100)
streams = append(streams, stream)
go m.handleKlineData(symbol, ch, st)
return streams
}
func (m *WSMonitor) subscribeAll() error {
// Execute batch subscription
log.Println("Starting to subscribe to all trading pairs...")
for _, symbol := range m.symbols {
for _, st := range subKlineTime {
m.subscribeSymbol(symbol, st)
}
}
for _, st := range subKlineTime {
err := m.combinedClient.BatchSubscribeKlines(m.symbols, st)
if err != nil {
log.Printf("❌ Failed to subscribe to %s K-line: %v", st, err)
return err
}
}
log.Println("All trading pair subscriptions completed")
return nil
}
func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time string) {
for data := range ch {
var klineData KlineWSData
if err := json.Unmarshal(data, &klineData); err != nil {
log.Printf("Failed to parse Kline data: %v", err)
continue
}
m.processKlineUpdate(symbol, klineData, _time)
}
}
func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map {
var klineDataMap *sync.Map
if _time == "3m" {
klineDataMap = &m.klineDataMap3m
} else if _time == "4h" {
klineDataMap = &m.klineDataMap4h
} else {
klineDataMap = &sync.Map{}
}
return klineDataMap
}
func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time string) {
// Convert WebSocket data to Kline structure
kline := Kline{
OpenTime: wsData.Kline.StartTime,
CloseTime: wsData.Kline.CloseTime,
Trades: wsData.Kline.NumberOfTrades,
}
kline.Open, _ = parseFloat(wsData.Kline.OpenPrice)
kline.High, _ = parseFloat(wsData.Kline.HighPrice)
kline.Low, _ = parseFloat(wsData.Kline.LowPrice)
kline.Close, _ = parseFloat(wsData.Kline.ClosePrice)
kline.Volume, _ = parseFloat(wsData.Kline.Volume)
kline.High, _ = parseFloat(wsData.Kline.HighPrice)
kline.QuoteVolume, _ = parseFloat(wsData.Kline.QuoteVolume)
kline.TakerBuyBaseVolume, _ = parseFloat(wsData.Kline.TakerBuyBaseVolume)
kline.TakerBuyQuoteVolume, _ = parseFloat(wsData.Kline.TakerBuyQuoteVolume)
// Update K-line data
var klineDataMap = m.getKlineDataMap(_time)
value, exists := klineDataMap.Load(symbol)
var klines []Kline
if exists {
klines = value.([]Kline)
// Check if it's a new K-line
if len(klines) > 0 && klines[len(klines)-1].OpenTime == kline.OpenTime {
// Update current K-line
klines[len(klines)-1] = kline
} else {
// Add new K-line
klines = append(klines, kline)
// Maintain data length
if len(klines) > 100 {
klines = klines[1:]
}
}
} else {
klines = []Kline{kline}
}
klineDataMap.Store(symbol, klines)
}
func (m *WSMonitor) GetCurrentKlines(symbol string, duration string) ([]Kline, error) {
// Check if each incoming symbol exists internally, if not subscribe to it
value, exists := m.getKlineDataMap(duration).Load(symbol)
if !exists {
// If WS data is not initialized, use API separately - compatibility code (prevents trader from running when not initialized)
apiClient := NewAPIClient()
klines, err := apiClient.GetKlines(symbol, duration, 100)
if err != nil {
return nil, fmt.Errorf("Failed to get %v-minute K-line: %v", duration, err)
}
// Dynamically cache into cache
m.getKlineDataMap(duration).Store(strings.ToUpper(symbol), klines)
// Subscribe to WebSocket stream
subStr := m.subscribeSymbol(symbol, duration)
subErr := m.combinedClient.subscribeStreams(subStr)
log.Printf("Dynamic subscription to stream: %v", subStr)
if subErr != nil {
log.Printf("Warning: Failed to dynamically subscribe to %v-minute K-line: %v (using API data)", duration, subErr)
}
// ✅ FIX: Return deep copy instead of reference
result := make([]Kline, len(klines))
copy(result, klines)
return result, nil
}
// ✅ FIX: Return deep copy instead of reference, avoid concurrent race conditions
klines := value.([]Kline)
result := make([]Kline, len(klines))
copy(result, klines)
return result, nil
}
func (m *WSMonitor) Close() {
m.wsClient.Close()
close(m.alertsChan)
}

View File

@@ -1,231 +0,0 @@
package market
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
type WSClient struct {
conn *websocket.Conn
mu sync.RWMutex
subscribers map[string]chan []byte
reconnect bool
done chan struct{}
}
type WSMessage struct {
Stream string `json:"stream"`
Data json.RawMessage `json:"data"`
}
type KlineWSData struct {
EventType string `json:"e"`
EventTime int64 `json:"E"`
Symbol string `json:"s"`
Kline struct {
StartTime int64 `json:"t"`
CloseTime int64 `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
OpenPrice string `json:"o"`
ClosePrice string `json:"c"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
Volume string `json:"v"`
NumberOfTrades int `json:"n"`
IsFinal bool `json:"x"`
QuoteVolume string `json:"q"`
TakerBuyBaseVolume string `json:"V"`
TakerBuyQuoteVolume string `json:"Q"`
} `json:"k"`
}
type TickerWSData struct {
EventType string `json:"e"`
EventTime int64 `json:"E"`
Symbol string `json:"s"`
PriceChange string `json:"p"`
PriceChangePercent string `json:"P"`
WeightedAvgPrice string `json:"w"`
LastPrice string `json:"c"`
LastQty string `json:"Q"`
OpenPrice string `json:"o"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
Volume string `json:"v"`
QuoteVolume string `json:"q"`
OpenTime int64 `json:"O"`
CloseTime int64 `json:"C"`
FirstID int64 `json:"F"`
LastID int64 `json:"L"`
Count int `json:"n"`
}
func NewWSClient() *WSClient {
return &WSClient{
subscribers: make(map[string]chan []byte),
reconnect: true,
done: make(chan struct{}),
}
}
func (w *WSClient) Connect() error {
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
conn, _, err := dialer.Dial("wss://ws-fapi.binance.com/ws-fapi/v1", nil)
if err != nil {
return fmt.Errorf("WebSocket connection failed: %v", err)
}
w.mu.Lock()
w.conn = conn
w.mu.Unlock()
log.Println("WebSocket connected successfully")
// Start message reading loop
go w.readMessages()
return nil
}
func (w *WSClient) SubscribeKline(symbol, interval string) error {
stream := fmt.Sprintf("%s@kline_%s", symbol, interval)
return w.subscribe(stream)
}
func (w *WSClient) SubscribeTicker(symbol string) error {
stream := fmt.Sprintf("%s@ticker", symbol)
return w.subscribe(stream)
}
func (w *WSClient) SubscribeMiniTicker(symbol string) error {
stream := fmt.Sprintf("%s@miniTicker", symbol)
return w.subscribe(stream)
}
func (w *WSClient) subscribe(stream string) error {
subscribeMsg := map[string]interface{}{
"method": "SUBSCRIBE",
"params": []string{stream},
"id": time.Now().Unix(),
}
w.mu.RLock()
defer w.mu.RUnlock()
if w.conn == nil {
return fmt.Errorf("WebSocket not connected")
}
err := w.conn.WriteJSON(subscribeMsg)
if err != nil {
return err
}
log.Printf("Subscribing to stream: %s", stream)
return nil
}
func (w *WSClient) readMessages() {
for {
select {
case <-w.done:
return
default:
w.mu.RLock()
conn := w.conn
w.mu.RUnlock()
if conn == nil {
time.Sleep(1 * time.Second)
continue
}
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("Failed to read WebSocket message: %v", err)
w.handleReconnect()
return
}
w.handleMessage(message)
}
}
}
func (w *WSClient) handleMessage(message []byte) {
var wsMsg WSMessage
if err := json.Unmarshal(message, &wsMsg); err != nil {
// Might be a different message format
return
}
w.mu.RLock()
ch, exists := w.subscribers[wsMsg.Stream]
w.mu.RUnlock()
if exists {
select {
case ch <- wsMsg.Data:
default:
log.Printf("Subscriber channel is full: %s", wsMsg.Stream)
}
}
}
func (w *WSClient) handleReconnect() {
if !w.reconnect {
return
}
log.Println("Attempting to reconnect...")
time.Sleep(3 * time.Second)
if err := w.Connect(); err != nil {
log.Printf("Reconnection failed: %v", err)
go w.handleReconnect()
}
}
func (w *WSClient) AddSubscriber(stream string, bufferSize int) <-chan []byte {
ch := make(chan []byte, bufferSize)
w.mu.Lock()
w.subscribers[stream] = ch
w.mu.Unlock()
return ch
}
func (w *WSClient) RemoveSubscriber(stream string) {
w.mu.Lock()
delete(w.subscribers, stream)
w.mu.Unlock()
}
func (w *WSClient) Close() {
w.reconnect = false
close(w.done)
w.mu.Lock()
defer w.mu.Unlock()
if w.conn != nil {
w.conn.Close()
w.conn = nil
}
// Close all subscriber channels
for stream, ch := range w.subscribers {
close(ch)
delete(w.subscribers, stream)
}
}