diff --git a/go.mod b/go.mod index fdac655a..5ead4589 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/mailru/easyjson v0.9.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-sqlite3 v1.14.32 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect diff --git a/go.sum b/go.sum index e9842c4a..15ee1f95 100644 --- a/go.sum +++ b/go.sum @@ -138,6 +138,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= diff --git a/main.go b/main.go index 913c1296..56879d33 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "nofx/manager" "nofx/mcp" "nofx/store" - "nofx/trader" "os" "os/signal" "path/filepath" @@ -115,11 +114,6 @@ func main() { logger.Warnf("⚠️ Failed to restore backtest history: %v", err) } - // Start position sync manager (detects manual closures, TP/SL triggers) - positionSyncManager := trader.NewPositionSyncManager(st, 0) // 0 = use default 10s interval - positionSyncManager.Start() - defer positionSyncManager.Stop() - // Load all traders from database to memory (may auto-start traders with IsRunning=true) if err := traderManager.LoadTradersFromStore(st); err != nil { logger.Fatalf("❌ Failed to load traders: %v", err) diff --git a/store/order.go b/store/order.go index b82b65fb..b2b067e4 100644 --- a/store/order.go +++ b/store/order.go @@ -10,12 +10,13 @@ import ( type TraderOrder struct { ID int64 `json:"id"` TraderID string `json:"trader_id"` - ExchangeID string `json:"exchange_id"` // 交易所账户UUID - ExchangeOrderID string `json:"exchange_order_id"` // 交易所订单ID - ClientOrderID string `json:"client_order_id"` // 客户端订单ID - Symbol string `json:"symbol"` // 交易对 + ExchangeID string `json:"exchange_id"` // Exchange account UUID + ExchangeType string `json:"exchange_type"` // Exchange type (hyperliquid/lighter/binance/etc) + ExchangeOrderID string `json:"exchange_order_id"` // Exchange order ID + ClientOrderID string `json:"client_order_id"` // Client order ID + Symbol string `json:"symbol"` // Trading pair Side string `json:"side"` // BUY/SELL - PositionSide string `json:"position_side"` // LONG/SHORT (双向持仓模式) + PositionSide string `json:"position_side"` // LONG/SHORT (hedge mode) Type string `json:"type"` // MARKET/LIMIT/STOP/STOP_MARKET/TAKE_PROFIT/TAKE_PROFIT_MARKET TimeInForce string `json:"time_in_force"` // GTC/IOC/FOK Quantity float64 `json:"quantity"` // 订单数量 @@ -38,14 +39,15 @@ type TraderOrder struct { FilledAt time.Time `json:"filled_at"` // 完全成交时间 } -// TraderFill 成交记录(一个订单可能有多次成交) +// TraderFill trade record (one order may have multiple fills) type TraderFill struct { ID int64 `json:"id"` TraderID string `json:"trader_id"` - ExchangeID string `json:"exchange_id"` - OrderID int64 `json:"order_id"` // 关联的订单ID - ExchangeOrderID string `json:"exchange_order_id"` // 交易所订单ID - ExchangeTradeID string `json:"exchange_trade_id"` // 交易所成交ID + ExchangeID string `json:"exchange_id"` // Exchange account UUID + ExchangeType string `json:"exchange_type"` // Exchange type (hyperliquid/lighter/binance/etc) + OrderID int64 `json:"order_id"` // Related order ID + ExchangeOrderID string `json:"exchange_order_id"` // Exchange order ID + ExchangeTradeID string `json:"exchange_trade_id"` // Exchange trade ID Symbol string `json:"symbol"` Side string `json:"side"` // BUY/SELL Price float64 `json:"price"` // 成交价格 @@ -76,6 +78,7 @@ func (s *OrderStore) InitTables() error { id INTEGER PRIMARY KEY AUTOINCREMENT, trader_id TEXT NOT NULL, exchange_id TEXT NOT NULL DEFAULT '', + exchange_type TEXT NOT NULL DEFAULT '', exchange_order_id TEXT NOT NULL, client_order_id TEXT DEFAULT '', symbol TEXT NOT NULL, @@ -114,6 +117,7 @@ func (s *OrderStore) InitTables() error { id INTEGER PRIMARY KEY AUTOINCREMENT, trader_id TEXT NOT NULL, exchange_id TEXT NOT NULL DEFAULT '', + exchange_type TEXT NOT NULL DEFAULT '', order_id INTEGER NOT NULL, exchange_order_id TEXT NOT NULL, exchange_trade_id TEXT NOT NULL, @@ -168,16 +172,16 @@ func (s *OrderStore) CreateOrder(order *TraderOrder) error { result, err := s.db.Exec(` INSERT INTO trader_orders ( - trader_id, exchange_id, exchange_order_id, client_order_id, + trader_id, exchange_id, exchange_type, exchange_order_id, client_order_id, symbol, side, position_side, type, time_in_force, quantity, price, stop_price, status, filled_quantity, avg_fill_price, commission, commission_asset, leverage, reduce_only, close_position, working_type, price_protect, order_action, related_position_id, created_at, updated_at, filled_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, - order.TraderID, order.ExchangeID, order.ExchangeOrderID, order.ClientOrderID, + order.TraderID, order.ExchangeID, order.ExchangeType, order.ExchangeOrderID, order.ClientOrderID, order.Symbol, order.Side, order.PositionSide, order.Type, order.TimeInForce, order.Quantity, order.Price, order.StopPrice, order.Status, order.FilledQuantity, order.AvgFillPrice, order.Commission, order.CommissionAsset, @@ -244,13 +248,13 @@ func (s *OrderStore) CreateFill(fill *TraderFill) error { result, err := s.db.Exec(` INSERT INTO trader_fills ( - trader_id, exchange_id, order_id, exchange_order_id, exchange_trade_id, + trader_id, exchange_id, exchange_type, order_id, exchange_order_id, exchange_trade_id, symbol, side, price, quantity, quote_quantity, commission, commission_asset, realized_pnl, is_maker, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, - fill.TraderID, fill.ExchangeID, fill.OrderID, fill.ExchangeOrderID, fill.ExchangeTradeID, + fill.TraderID, fill.ExchangeID, fill.ExchangeType, fill.OrderID, fill.ExchangeOrderID, fill.ExchangeTradeID, fill.Symbol, fill.Side, fill.Price, fill.Quantity, fill.QuoteQuantity, fill.Commission, fill.CommissionAsset, fill.RealizedPnL, fill.IsMaker, now.Format(time.RFC3339), @@ -267,7 +271,7 @@ func (s *OrderStore) CreateFill(fill *TraderFill) error { // GetFillByExchangeTradeID 根据交易所成交ID获取成交记录 func (s *OrderStore) GetFillByExchangeTradeID(exchangeID, exchangeTradeID string) (*TraderFill, error) { row := s.db.QueryRow(` - SELECT id, trader_id, exchange_id, order_id, exchange_order_id, exchange_trade_id, + SELECT id, trader_id, exchange_id, exchange_type, order_id, exchange_order_id, exchange_trade_id, symbol, side, price, quantity, quote_quantity, commission, commission_asset, realized_pnl, is_maker, created_at @@ -278,7 +282,7 @@ func (s *OrderStore) GetFillByExchangeTradeID(exchangeID, exchangeTradeID string var fill TraderFill var createdAt sql.NullString err := row.Scan( - &fill.ID, &fill.TraderID, &fill.ExchangeID, &fill.OrderID, &fill.ExchangeOrderID, &fill.ExchangeTradeID, + &fill.ID, &fill.TraderID, &fill.ExchangeID, &fill.ExchangeType, &fill.OrderID, &fill.ExchangeOrderID, &fill.ExchangeTradeID, &fill.Symbol, &fill.Side, &fill.Price, &fill.Quantity, &fill.QuoteQuantity, &fill.Commission, &fill.CommissionAsset, &fill.RealizedPnL, &fill.IsMaker, &createdAt, @@ -303,7 +307,7 @@ func (s *OrderStore) GetFillByExchangeTradeID(exchangeID, exchangeTradeID string // GetOrderByExchangeID 根据交易所订单ID获取订单 func (s *OrderStore) GetOrderByExchangeID(exchangeID, exchangeOrderID string) (*TraderOrder, error) { row := s.db.QueryRow(` - SELECT id, trader_id, exchange_id, exchange_order_id, client_order_id, + SELECT id, trader_id, exchange_id, exchange_type, exchange_order_id, client_order_id, symbol, side, position_side, type, time_in_force, quantity, price, stop_price, status, filled_quantity, avg_fill_price, commission, commission_asset, @@ -317,7 +321,7 @@ func (s *OrderStore) GetOrderByExchangeID(exchangeID, exchangeOrderID string) (* var order TraderOrder var createdAt, updatedAt, filledAt sql.NullString err := row.Scan( - &order.ID, &order.TraderID, &order.ExchangeID, &order.ExchangeOrderID, &order.ClientOrderID, + &order.ID, &order.TraderID, &order.ExchangeID, &order.ExchangeType, &order.ExchangeOrderID, &order.ClientOrderID, &order.Symbol, &order.Side, &order.PositionSide, &order.Type, &order.TimeInForce, &order.Quantity, &order.Price, &order.StopPrice, &order.Status, &order.FilledQuantity, &order.AvgFillPrice, &order.Commission, &order.CommissionAsset, @@ -355,7 +359,7 @@ func (s *OrderStore) GetOrderByExchangeID(exchangeID, exchangeOrderID string) (* // GetTraderOrders 获取trader的订单列表 func (s *OrderStore) GetTraderOrders(traderID string, limit int) ([]*TraderOrder, error) { rows, err := s.db.Query(` - SELECT id, trader_id, exchange_id, exchange_order_id, client_order_id, + SELECT id, trader_id, exchange_id, exchange_type, exchange_order_id, client_order_id, symbol, side, position_side, type, time_in_force, quantity, price, stop_price, status, filled_quantity, avg_fill_price, commission, commission_asset, @@ -377,7 +381,7 @@ func (s *OrderStore) GetTraderOrders(traderID string, limit int) ([]*TraderOrder var order TraderOrder var createdAt, updatedAt, filledAt sql.NullString err := rows.Scan( - &order.ID, &order.TraderID, &order.ExchangeID, &order.ExchangeOrderID, &order.ClientOrderID, + &order.ID, &order.TraderID, &order.ExchangeID, &order.ExchangeType, &order.ExchangeOrderID, &order.ClientOrderID, &order.Symbol, &order.Side, &order.PositionSide, &order.Type, &order.TimeInForce, &order.Quantity, &order.Price, &order.StopPrice, &order.Status, &order.FilledQuantity, &order.AvgFillPrice, &order.Commission, &order.CommissionAsset, @@ -415,7 +419,7 @@ func (s *OrderStore) GetTraderOrders(traderID string, limit int) ([]*TraderOrder // GetOrderFills 获取订单的成交记录 func (s *OrderStore) GetOrderFills(orderID int64) ([]*TraderFill, error) { rows, err := s.db.Query(` - SELECT id, trader_id, exchange_id, order_id, exchange_order_id, exchange_trade_id, + SELECT id, trader_id, exchange_id, exchange_type, order_id, exchange_order_id, exchange_trade_id, symbol, side, price, quantity, quote_quantity, commission, commission_asset, realized_pnl, is_maker, created_at @@ -433,7 +437,7 @@ func (s *OrderStore) GetOrderFills(orderID int64) ([]*TraderFill, error) { var fill TraderFill var createdAt sql.NullString err := rows.Scan( - &fill.ID, &fill.TraderID, &fill.ExchangeID, &fill.OrderID, &fill.ExchangeOrderID, &fill.ExchangeTradeID, + &fill.ID, &fill.TraderID, &fill.ExchangeID, &fill.ExchangeType, &fill.OrderID, &fill.ExchangeOrderID, &fill.ExchangeTradeID, &fill.Symbol, &fill.Side, &fill.Price, &fill.Quantity, &fill.QuoteQuantity, &fill.Commission, &fill.CommissionAsset, &fill.RealizedPnL, &fill.IsMaker, &createdAt, diff --git a/trader/aster_order_sync.go b/trader/aster_order_sync.go new file mode 100644 index 00000000..cff9b7cd --- /dev/null +++ b/trader/aster_order_sync.go @@ -0,0 +1,188 @@ +package trader + +import ( + "fmt" + "nofx/logger" + "nofx/store" + "sort" + "strings" + "time" +) + +// SyncOrdersFromAster syncs Aster exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("aster") +func (t *AsterTrader) SyncOrdersFromAster(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Get recent trades (last 24 hours) + startTime := time.Now().Add(-24 * time.Hour) + + logger.Infof("🔄 Syncing Aster trades from: %s", startTime.Format(time.RFC3339)) + + // Use GetTrades method to fetch trade records + trades, err := t.GetTrades(startTime, 500) + if err != nil { + return fmt.Errorf("failed to get trades: %w", err) + } + + logger.Infof("📥 Received %d trades from Aster", len(trades)) + + // Sort trades by time ASC (oldest first) for proper position building + sort.Slice(trades, func(i, j int) bool { + return trades[i].Time.Before(trades[j].Time) + }) + + // Process trades one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + syncedCount := 0 + + for _, trade := range trades { + // Check if trade already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID) + if err == nil && existing != nil { + continue // Order already exists, skip + } + + // Determine order action based on side, positionSide, and realizedPnL + // Aster uses one-way position mode (BOTH), so we need to infer from PnL + // - RealizedPnL != 0 means it's a close trade + // - RealizedPnL == 0 means it's an open trade + orderAction := deriveAsterOrderAction(trade.Side, trade.PositionSide, trade.RealizedPnL) + + // Determine position side from order action + positionSide := "LONG" + if strings.Contains(orderAction, "short") { + positionSide = "SHORT" + } + + // Normalize side for storage + side := strings.ToUpper(trade.Side) + + // Create order record + orderRecord := &store.TraderOrder{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + ExchangeOrderID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + PositionSide: "BOTH", // Aster uses one-way position mode + Type: "LIMIT", + OrderAction: orderAction, + Quantity: trade.Quantity, + Price: trade.Price, + Status: "FILLED", + FilledQuantity: trade.Quantity, + AvgFillPrice: trade.Price, + Commission: trade.Fee, + FilledAt: trade.Time, + CreatedAt: trade.Time, + UpdatedAt: trade.Time, + } + + // Insert order record + if err := orderStore.CreateOrder(orderRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err) + continue + } + + // Create fill record + fillRecord := &store.TraderFill{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + OrderID: orderRecord.ID, + ExchangeOrderID: trade.TradeID, + ExchangeTradeID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + Price: trade.Price, + Quantity: trade.Quantity, + QuoteQuantity: trade.Price * trade.Quantity, + Commission: trade.Fee, + CommissionAsset: "USDT", + RealizedPnL: trade.RealizedPnL, + IsMaker: false, + CreatedAt: trade.Time, + } + + if err := orderStore.CreateFill(fillRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err) + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + trade.Symbol, positionSide, orderAction, + trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL, + trade.Time, trade.TradeID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err) + } else { + logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, orderAction, trade.Quantity) + } + + syncedCount++ + logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s", + trade.TradeID, trade.Symbol, side, trade.Quantity, trade.Price, trade.RealizedPnL, trade.Fee, orderAction) + } + + logger.Infof("✅ Aster order sync completed: %d new trades synced", syncedCount) + return nil +} + +// deriveAsterOrderAction determines order action from trade details +// Aster uses one-way position mode (BOTH), so we infer from: +// - Side: BUY or SELL +// - RealizedPnL: non-zero means closing trade +func deriveAsterOrderAction(side, positionSide string, realizedPnL float64) string { + side = strings.ToUpper(side) + positionSide = strings.ToUpper(positionSide) + + // Check if this is a closing trade (has realized PnL) + isClose := realizedPnL != 0 + + if positionSide == "LONG" { + if isClose { + return "close_long" + } + return "open_long" + } else if positionSide == "SHORT" { + if isClose { + return "close_short" + } + return "open_short" + } else { + // BOTH mode - infer from side and PnL + if side == "BUY" { + if isClose { + return "close_short" // Buying to close short + } + return "open_long" // Buying to open long + } else { + if isClose { + return "close_long" // Selling to close long + } + return "open_short" // Selling to open short + } + } +} + +// StartOrderSync starts background order sync task for Aster +func (t *AsterTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := t.SyncOrdersFromAster(traderID, exchangeID, exchangeType, st); err != nil { + logger.Infof("⚠️ Aster order sync failed: %v", err) + } + } + }() + logger.Infof("🔄 Aster order sync started (interval: %v)", interval) +} diff --git a/trader/auto_trader.go b/trader/auto_trader.go index d1375016..38b2e65a 100644 --- a/trader/auto_trader.go +++ b/trader/auto_trader.go @@ -363,8 +363,48 @@ func (at *AutoTrader) Run() error { // Start Lighter order sync if using Lighter exchange if at.exchange == "lighter" { if lighterTrader, ok := at.trader.(*LighterTraderV2); ok && at.store != nil { - lighterTrader.StartOrderSync(at.id, at.store.Order(), 30*time.Second) - logger.Infof("🔄 [%s] Lighter order sync enabled (every 30s)", at.name) + lighterTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] Lighter order+position sync enabled (every 30s)", at.name) + } + } + + // Start Hyperliquid order sync if using Hyperliquid exchange + if at.exchange == "hyperliquid" { + if hyperliquidTrader, ok := at.trader.(*HyperliquidTrader); ok && at.store != nil { + hyperliquidTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] Hyperliquid order+position sync enabled (every 30s)", at.name) + } + } + + // Start Bybit order sync if using Bybit exchange + if at.exchange == "bybit" { + if bybitTrader, ok := at.trader.(*BybitTrader); ok && at.store != nil { + bybitTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] Bybit order+position sync enabled (every 30s)", at.name) + } + } + + // Start OKX order sync if using OKX exchange + if at.exchange == "okx" { + if okxTrader, ok := at.trader.(*OKXTrader); ok && at.store != nil { + okxTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] OKX order+position sync enabled (every 30s)", at.name) + } + } + + // Start Bitget order sync if using Bitget exchange + if at.exchange == "bitget" { + if bitgetTrader, ok := at.trader.(*BitgetTrader); ok && at.store != nil { + bitgetTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] Bitget order+position sync enabled (every 30s)", at.name) + } + } + + // Start Aster order sync if using Aster exchange + if at.exchange == "aster" { + if asterTrader, ok := at.trader.(*AsterTrader); ok && at.store != nil { + asterTrader.StartOrderSync(at.id, at.exchangeID, at.exchange, at.store, 30*time.Second) + logger.Infof("🔄 [%s] Aster order+position sync enabled (every 30s)", at.name) } } @@ -1888,29 +1928,11 @@ func (at *AutoTrader) recordPositionChange(orderID, symbol, side, action string, return } - // Calculate P&L - var realizedPnL float64 - if side == "LONG" { - realizedPnL = (price - openPos.EntryPrice) * openPos.Quantity - } else { - realizedPnL = (openPos.EntryPrice - price) * openPos.Quantity - } - - // Update position record - err = at.store.Position().ClosePosition( - openPos.ID, - price, // exitPrice - orderID, // exitOrderID - realizedPnL, - fee, // fee from exchange API - "ai_decision", - ) - if err != nil { - logger.Infof(" ⚠️ Failed to update position: %v", err) - } else { - logger.Infof(" 📊 Position closed [%s] %s %s @ %.4f → %.4f, P&L: %.2f, Fee: %.4f", - at.id[:8], symbol, side, openPos.EntryPrice, price, realizedPnL, fee) - } + // NOTE: Position update removed - Order Sync will handle it automatically + // Order Sync will pick up the fill and update the position through PositionBuilder + // This ensures accurate fee accumulation and PnL calculation + logger.Infof(" ✅ Order placed [%s] %s %s @ %.4f, will be synced by Order Sync", + at.id[:8], symbol, side, price) } } diff --git a/trader/bitget_order_sync.go b/trader/bitget_order_sync.go new file mode 100644 index 00000000..66e41fb6 --- /dev/null +++ b/trader/bitget_order_sync.go @@ -0,0 +1,257 @@ +package trader + +import ( + "encoding/json" + "fmt" + "nofx/logger" + "nofx/store" + "sort" + "strconv" + "strings" + "time" +) + +// BitgetTrade represents a trade record from Bitget fill history +type BitgetTrade struct { + Symbol string + TradeID string + OrderID string + Side string // buy or sell + FillPrice float64 + FillQty float64 + Fee float64 + FeeAsset string + ExecTime time.Time + ProfitLoss float64 + OrderType string + OrderAction string // open_long, open_short, close_long, close_short +} + +// GetTrades retrieves trade/fill records from Bitget +func (t *BitgetTrader) GetTrades(startTime time.Time, limit int) ([]BitgetTrade, error) { + if limit <= 0 { + limit = 100 + } + if limit > 100 { + limit = 100 // Bitget max limit is 100 + } + + params := map[string]interface{}{ + "productType": "USDT-FUTURES", + "startTime": fmt.Sprintf("%d", startTime.UnixMilli()), + "limit": fmt.Sprintf("%d", limit), + } + + data, err := t.doRequest("GET", "/api/v2/mix/order/fill-history", params) + if err != nil { + return nil, fmt.Errorf("failed to get fill history: %w", err) + } + + var resp struct { + FillList []struct { + TradeID string `json:"tradeId"` + Symbol string `json:"symbol"` + OrderID string `json:"orderId"` + Side string `json:"side"` // buy, sell + Price string `json:"price"` // Fill price + BaseVolume string `json:"baseVolume"` // Fill size in base currency + Fee string `json:"fee"` // Fee (negative for cost) + FeeCcy string `json:"feeCcy"` // Fee currency + Profit string `json:"profit"` // Realized PnL + CTime string `json:"cTime"` // Fill time (ms) + TradeSide string `json:"tradeSide"` // open, close + } `json:"fillList"` + } + + if err := json.Unmarshal(data, &resp); err != nil { + return nil, fmt.Errorf("failed to parse fills: %w", err) + } + + trades := make([]BitgetTrade, 0, len(resp.FillList)) + + for _, fill := range resp.FillList { + fillPrice, _ := strconv.ParseFloat(fill.Price, 64) + fillQty, _ := strconv.ParseFloat(fill.BaseVolume, 64) + fee, _ := strconv.ParseFloat(fill.Fee, 64) + profit, _ := strconv.ParseFloat(fill.Profit, 64) + cTime, _ := strconv.ParseInt(fill.CTime, 10, 64) + + // Determine order action based on side and tradeSide + // Bitget one-way mode: + // - buy + open = open long + // - sell + open = open short + // - sell + close = close long + // - buy + close = close short + orderAction := "open_long" + side := strings.ToLower(fill.Side) + tradeSide := strings.ToLower(fill.TradeSide) + + if tradeSide == "open" { + if side == "buy" { + orderAction = "open_long" + } else { + orderAction = "open_short" + } + } else if tradeSide == "close" { + if side == "sell" { + orderAction = "close_long" + } else { + orderAction = "close_short" + } + } + + trade := BitgetTrade{ + Symbol: fill.Symbol, + TradeID: fill.TradeID, + OrderID: fill.OrderID, + Side: fill.Side, + FillPrice: fillPrice, + FillQty: fillQty, + Fee: -fee, // Bitget returns negative fee + FeeAsset: fill.FeeCcy, + ExecTime: time.UnixMilli(cTime), + ProfitLoss: profit, + OrderType: "MARKET", + OrderAction: orderAction, + } + + trades = append(trades, trade) + } + + return trades, nil +} + +// SyncOrdersFromBitget syncs Bitget exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("bitget") +func (t *BitgetTrader) SyncOrdersFromBitget(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Get recent trades (last 24 hours) + startTime := time.Now().Add(-24 * time.Hour) + + logger.Infof("🔄 Syncing Bitget trades from: %s", startTime.Format(time.RFC3339)) + + // Use GetTrades method to fetch trade records + trades, err := t.GetTrades(startTime, 100) + if err != nil { + return fmt.Errorf("failed to get trades: %w", err) + } + + logger.Infof("📥 Received %d trades from Bitget", len(trades)) + + // Sort trades by time ASC (oldest first) for proper position building + sort.Slice(trades, func(i, j int) bool { + return trades[i].ExecTime.Before(trades[j].ExecTime) + }) + + // Process trades one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + syncedCount := 0 + + for _, trade := range trades { + // Check if trade already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID) + if err == nil && existing != nil { + continue // Order already exists, skip + } + + // Determine position side from order action + positionSide := "LONG" + if strings.Contains(trade.OrderAction, "short") { + positionSide = "SHORT" + } + + // Normalize side for storage + side := strings.ToUpper(trade.Side) + + // Create order record + orderRecord := &store.TraderOrder{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + ExchangeOrderID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + PositionSide: "BOTH", // Bitget uses one-way position mode + Type: trade.OrderType, + OrderAction: trade.OrderAction, + Quantity: trade.FillQty, + Price: trade.FillPrice, + Status: "FILLED", + FilledQuantity: trade.FillQty, + AvgFillPrice: trade.FillPrice, + Commission: trade.Fee, + FilledAt: trade.ExecTime, + CreatedAt: trade.ExecTime, + UpdatedAt: trade.ExecTime, + } + + // Insert order record + if err := orderStore.CreateOrder(orderRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err) + continue + } + + // Create fill record + fillRecord := &store.TraderFill{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + OrderID: orderRecord.ID, + ExchangeOrderID: trade.OrderID, + ExchangeTradeID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + Price: trade.FillPrice, + Quantity: trade.FillQty, + QuoteQuantity: trade.FillPrice * trade.FillQty, + Commission: trade.Fee, + CommissionAsset: trade.FeeAsset, + RealizedPnL: trade.ProfitLoss, + IsMaker: false, + CreatedAt: trade.ExecTime, + } + + if err := orderStore.CreateFill(fillRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err) + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + trade.Symbol, positionSide, trade.OrderAction, + trade.FillQty, trade.FillPrice, trade.Fee, trade.ProfitLoss, + trade.ExecTime, trade.TradeID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err) + } else { + logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, trade.OrderAction, trade.FillQty) + } + + syncedCount++ + logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s", + trade.TradeID, trade.Symbol, side, trade.FillQty, trade.FillPrice, trade.ProfitLoss, trade.Fee, trade.OrderAction) + } + + logger.Infof("✅ Bitget order sync completed: %d new trades synced", syncedCount) + return nil +} + +// StartOrderSync starts background order sync task for Bitget +func (t *BitgetTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := t.SyncOrdersFromBitget(traderID, exchangeID, exchangeType, st); err != nil { + logger.Infof("⚠️ Bitget order sync failed: %v", err) + } + } + }() + logger.Infof("🔄 Bitget order sync started (interval: %v)", interval) +} diff --git a/trader/bybit_order_sync.go b/trader/bybit_order_sync.go new file mode 100644 index 00000000..d19ce2d4 --- /dev/null +++ b/trader/bybit_order_sync.go @@ -0,0 +1,309 @@ +package trader + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "nofx/logger" + "nofx/store" + "sort" + "strconv" + "strings" + "time" +) + +// BybitTrade represents a trade record from Bybit execution list +type BybitTrade struct { + Symbol string + OrderID string + ExecID string + Side string // Buy or Sell + ExecPrice float64 + ExecQty float64 + ExecFee float64 + ExecTime time.Time + IsMaker bool + OrderType string + ClosedSize float64 // For close orders + ClosedPnL float64 + OrderAction string // open_long, open_short, close_long, close_short +} + +// GetTrades retrieves trade/execution records from Bybit +func (t *BybitTrader) GetTrades(startTime time.Time, limit int) ([]BybitTrade, error) { + return t.getTradesViaHTTP(startTime, limit) +} + +// getTradesViaHTTP makes direct HTTP call to Bybit API for execution list +func (t *BybitTrader) getTradesViaHTTP(startTime time.Time, limit int) ([]BybitTrade, error) { + // Build query string + queryParams := fmt.Sprintf("category=linear&startTime=%d&limit=%d", startTime.UnixMilli(), limit) + url := "https://api.bybit.com/v5/execution/list?" + queryParams + + // Generate timestamp + timestamp := fmt.Sprintf("%d", time.Now().UnixMilli()) + recvWindow := "5000" + + // Build signature payload: timestamp + api_key + recv_window + queryString + signPayload := timestamp + t.apiKey + recvWindow + queryParams + + // Generate HMAC-SHA256 signature + h := hmac.New(sha256.New, []byte(t.secretKey)) + h.Write([]byte(signPayload)) + signature := hex.EncodeToString(h.Sum(nil)) + + // Create request + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Add Bybit V5 API headers + req.Header.Set("X-BAPI-API-KEY", t.apiKey) + req.Header.Set("X-BAPI-SIGN", signature) + req.Header.Set("X-BAPI-SIGN-TYPE", "2") + req.Header.Set("X-BAPI-TIMESTAMP", timestamp) + req.Header.Set("X-BAPI-RECV-WINDOW", recvWindow) + req.Header.Set("Content-Type", "application/json") + + // Use http.DefaultClient for the request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to call Bybit API: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + var result struct { + RetCode int `json:"retCode"` + RetMsg string `json:"retMsg"` + Result struct { + List []map[string]interface{} `json:"list"` + } `json:"result"` + } + + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + if result.RetCode != 0 { + return nil, fmt.Errorf("Bybit API error: %s", result.RetMsg) + } + + return t.parseTradesResult(result.Result.List) +} + +// parseTradesResult parses the execution list result from Bybit API +func (t *BybitTrader) parseTradesResult(list []map[string]interface{}) ([]BybitTrade, error) { + var trades []BybitTrade + + for _, item := range list { + symbol, _ := item["symbol"].(string) + orderID, _ := item["orderId"].(string) + execID, _ := item["execId"].(string) + side, _ := item["side"].(string) + orderType, _ := item["orderType"].(string) + isMaker, _ := item["isMaker"].(bool) + + execPriceStr, _ := item["execPrice"].(string) + execQtyStr, _ := item["execQty"].(string) + execFeeStr, _ := item["execFee"].(string) + closedSizeStr, _ := item["closedSize"].(string) + closedPnlStr, _ := item["closedPnl"].(string) + execTimeStr, _ := item["execTime"].(string) + + execPrice, _ := strconv.ParseFloat(execPriceStr, 64) + execQty, _ := strconv.ParseFloat(execQtyStr, 64) + execFee, _ := strconv.ParseFloat(execFeeStr, 64) + closedSize, _ := strconv.ParseFloat(closedSizeStr, 64) + closedPnl, _ := strconv.ParseFloat(closedPnlStr, 64) + execTimeMs, _ := strconv.ParseInt(execTimeStr, 10, 64) + execTime := time.UnixMilli(execTimeMs) + + // Determine order action based on side and closedSize + // If closedSize > 0, it's a close trade + // Side: Buy = long direction, Sell = short direction + orderAction := "open_long" + if closedSize > 0 { + // This is a close trade + if strings.ToLower(side) == "sell" { + orderAction = "close_long" // Selling to close a long + } else { + orderAction = "close_short" // Buying to close a short + } + } else { + // This is an open trade + if strings.ToLower(side) == "buy" { + orderAction = "open_long" + } else { + orderAction = "open_short" + } + } + + trade := BybitTrade{ + Symbol: symbol, + OrderID: orderID, + ExecID: execID, + Side: side, + ExecPrice: execPrice, + ExecQty: execQty, + ExecFee: execFee, + ExecTime: execTime, + IsMaker: isMaker, + OrderType: orderType, + ClosedSize: closedSize, + ClosedPnL: closedPnl, + OrderAction: orderAction, + } + + trades = append(trades, trade) + } + + return trades, nil +} + +// SyncOrdersFromBybit syncs Bybit exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("bybit") +func (t *BybitTrader) SyncOrdersFromBybit(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Get recent trades (last 24 hours) + startTime := time.Now().Add(-24 * time.Hour) + + logger.Infof("🔄 Syncing Bybit trades from: %s", startTime.Format(time.RFC3339)) + + // Use GetTrades method to fetch trade records + trades, err := t.GetTrades(startTime, 1000) + if err != nil { + return fmt.Errorf("failed to get trades: %w", err) + } + + logger.Infof("📥 Received %d trades from Bybit", len(trades)) + + // Sort trades by time ASC (oldest first) for proper position building + sort.Slice(trades, func(i, j int) bool { + return trades[i].ExecTime.Before(trades[j].ExecTime) + }) + + // Process trades one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + syncedCount := 0 + + for _, trade := range trades { + // Check if trade already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.ExecID) + if err == nil && existing != nil { + continue // Order already exists, skip + } + + // Normalize symbol (should already have USDT suffix from Bybit) + symbol := trade.Symbol + + // Determine position side from order action + positionSide := "LONG" + if strings.Contains(trade.OrderAction, "short") { + positionSide = "SHORT" + } + + // Normalize side for storage + side := strings.ToUpper(trade.Side) + + // Create order record + orderRecord := &store.TraderOrder{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + ExchangeOrderID: trade.ExecID, // Use ExecID as unique identifier + Symbol: symbol, + Side: side, + PositionSide: "BOTH", // Bybit uses one-way position mode + Type: trade.OrderType, + OrderAction: trade.OrderAction, + Quantity: trade.ExecQty, + Price: trade.ExecPrice, + Status: "FILLED", + FilledQuantity: trade.ExecQty, + AvgFillPrice: trade.ExecPrice, + Commission: trade.ExecFee, + FilledAt: trade.ExecTime, + CreatedAt: trade.ExecTime, + UpdatedAt: trade.ExecTime, + } + + // Insert order record + if err := orderStore.CreateOrder(orderRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.ExecID, err) + continue + } + + // Create fill record + fillRecord := &store.TraderFill{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + OrderID: orderRecord.ID, + ExchangeOrderID: trade.OrderID, + ExchangeTradeID: trade.ExecID, + Symbol: symbol, + Side: side, + Price: trade.ExecPrice, + Quantity: trade.ExecQty, + QuoteQuantity: trade.ExecPrice * trade.ExecQty, + Commission: trade.ExecFee, + CommissionAsset: "USDT", + RealizedPnL: trade.ClosedPnL, + IsMaker: trade.IsMaker, + CreatedAt: trade.ExecTime, + } + + if err := orderStore.CreateFill(fillRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.ExecID, err) + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, positionSide, trade.OrderAction, + trade.ExecQty, trade.ExecPrice, trade.ExecFee, trade.ClosedPnL, + trade.ExecTime, trade.ExecID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.ExecID, err) + } else { + logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.ExecID, trade.OrderAction, trade.ExecQty) + } + + syncedCount++ + logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s", + trade.ExecID, symbol, side, trade.ExecQty, trade.ExecPrice, trade.ClosedPnL, trade.ExecFee, trade.OrderAction) + } + + logger.Infof("✅ Bybit order sync completed: %d new trades synced", syncedCount) + return nil +} + +// StartOrderSync starts background order sync task for Bybit +func (t *BybitTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := t.SyncOrdersFromBybit(traderID, exchangeID, exchangeType, st); err != nil { + logger.Infof("⚠️ Bybit order sync failed: %v", err) + } + } + }() + logger.Infof("🔄 Bybit order sync started (interval: %v)", interval) +} diff --git a/trader/exchange_sync_test.go b/trader/exchange_sync_test.go new file mode 100644 index 00000000..3a996f50 --- /dev/null +++ b/trader/exchange_sync_test.go @@ -0,0 +1,337 @@ +package trader + +import ( + "database/sql" + "nofx/store" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// TestScenario represents a trading scenario to test +type TestScenario struct { + Name string + Trades []TestTrade + ExpectedPos []ExpectedPosition +} + +// TestTrade represents a single trade in a test scenario +type TestTrade struct { + Action string // open_long, close_short, etc. + Side string // LONG or SHORT + Symbol string + Quantity float64 + Price float64 + Fee float64 + RealizedPnL float64 +} + +// ExpectedPosition represents expected position state +type ExpectedPosition struct { + Symbol string + Side string + Quantity float64 + Status string // OPEN or CLOSED +} + +// Standard test scenarios that all exchanges should pass +func getStandardTestScenarios() []TestScenario { + return []TestScenario{ + { + Name: "Simple Open and Close Long", + Trades: []TestTrade{ + {Action: "open_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3500, Fee: 0.5, RealizedPnL: 0}, + {Action: "close_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3600, Fee: 0.5, RealizedPnL: 10}, + }, + ExpectedPos: []ExpectedPosition{}, // Should be fully closed + }, + { + Name: "Simple Open and Close Short", + Trades: []TestTrade{ + {Action: "open_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3500, Fee: 0.5, RealizedPnL: 0}, + {Action: "close_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3400, Fee: 0.5, RealizedPnL: 10}, + }, + ExpectedPos: []ExpectedPosition{}, + }, + { + Name: "Position Averaging", + Trades: []TestTrade{ + {Action: "open_long", Side: "LONG", Symbol: "BTCUSDT", Quantity: 0.01, Price: 50000, Fee: 1.0, RealizedPnL: 0}, + {Action: "open_long", Side: "LONG", Symbol: "BTCUSDT", Quantity: 0.01, Price: 51000, Fee: 1.0, RealizedPnL: 0}, + {Action: "close_long", Side: "LONG", Symbol: "BTCUSDT", Quantity: 0.02, Price: 52000, Fee: 2.0, RealizedPnL: 30}, + }, + ExpectedPos: []ExpectedPosition{}, + }, + { + Name: "Partial Close", + Trades: []TestTrade{ + {Action: "open_long", Side: "LONG", Symbol: "SOLUSDT", Quantity: 10, Price: 100, Fee: 2.0, RealizedPnL: 0}, + {Action: "close_long", Side: "LONG", Symbol: "SOLUSDT", Quantity: 3, Price: 105, Fee: 0.6, RealizedPnL: 15}, + }, + ExpectedPos: []ExpectedPosition{ + {Symbol: "SOLUSDT", Side: "LONG", Quantity: 7, Status: "OPEN"}, + }, + }, + { + Name: "Multiple Symbols", + Trades: []TestTrade{ + {Action: "open_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3500, Fee: 0.5, RealizedPnL: 0}, + {Action: "open_short", Side: "SHORT", Symbol: "BTCUSDT", Quantity: 0.01, Price: 50000, Fee: 1.0, RealizedPnL: 0}, + {Action: "close_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3600, Fee: 0.5, RealizedPnL: 10}, + }, + ExpectedPos: []ExpectedPosition{ + {Symbol: "BTCUSDT", Side: "SHORT", Quantity: 0.01, Status: "OPEN"}, + }, + }, + { + Name: "Bug Scenario - Short then BUY to Close", + Trades: []TestTrade{ + // This tests the exact bug we fixed + {Action: "open_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.0472, Price: 3500, Fee: 0.2, RealizedPnL: 0}, + {Action: "close_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.0472, Price: 3400, Fee: 0.2, RealizedPnL: 4.72}, + }, + ExpectedPos: []ExpectedPosition{}, // Must be fully closed! + }, + { + Name: "Multiple Opens and Closes", + Trades: []TestTrade{ + {Action: "open_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3500, Fee: 0.5, RealizedPnL: 0}, + {Action: "close_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.1, Price: 3600, Fee: 0.5, RealizedPnL: 10}, + {Action: "open_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.05, Price: 3600, Fee: 0.3, RealizedPnL: 0}, + {Action: "close_short", Side: "SHORT", Symbol: "ETHUSDT", Quantity: 0.05, Price: 3500, Fee: 0.3, RealizedPnL: 5}, + {Action: "open_long", Side: "LONG", Symbol: "ETHUSDT", Quantity: 0.2, Price: 3550, Fee: 1.0, RealizedPnL: 0}, + }, + ExpectedPos: []ExpectedPosition{ + {Symbol: "ETHUSDT", Side: "LONG", Quantity: 0.2, Status: "OPEN"}, + }, + }, + } +} + +// runStandardTests runs all standard test scenarios +func runStandardTests(t *testing.T, exchangeName string) { + scenarios := getStandardTestScenarios() + + for _, scenario := range scenarios { + t.Run(scenario.Name, func(t *testing.T) { + // Setup database + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + positionStore := store.NewPositionStore(db) + if err := positionStore.InitTables(); err != nil { + t.Fatalf("Failed to initialize position tables: %v", err) + } + + posBuilder := store.NewPositionBuilder(positionStore) + + traderID := "test-trader" + exchangeID := "test-exchange-" + exchangeName + exchangeType := exchangeName + + // Process all trades + for i, trade := range scenario.Trades { + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + trade.Symbol, trade.Side, trade.Action, + trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL, + time.Now().Add(time.Duration(i)*time.Second), + "", + ) + if err != nil { + t.Fatalf("Failed to process trade %d (%s): %v", i, trade.Action, err) + } + } + + // Verify expected positions + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + + if len(positions) != len(scenario.ExpectedPos) { + t.Errorf("Expected %d open positions, got %d", len(scenario.ExpectedPos), len(positions)) + for _, p := range positions { + t.Errorf(" Got: %s %s qty=%.4f status=%s", p.Symbol, p.Side, p.Quantity, p.Status) + } + return + } + + // Verify each expected position + for _, expected := range scenario.ExpectedPos { + found := false + for _, actual := range positions { + if actual.Symbol == expected.Symbol && actual.Side == expected.Side { + found = true + if actual.Quantity != expected.Quantity { + t.Errorf("Position %s %s: expected qty %.4f, got %.4f", + expected.Symbol, expected.Side, expected.Quantity, actual.Quantity) + } + if actual.Status != expected.Status { + t.Errorf("Position %s %s: expected status %s, got %s", + expected.Symbol, expected.Side, expected.Status, actual.Status) + } + break + } + } + if !found { + t.Errorf("Expected position not found: %s %s", expected.Symbol, expected.Side) + } + } + }) + } +} + +// TestAllExchangesStandardScenarios runs standard scenarios for all exchanges +func TestAllExchangesStandardScenarios(t *testing.T) { + exchanges := []string{"hyperliquid", "binance", "bybit", "okx", "bitget", "aster", "lighter"} + + for _, exchange := range exchanges { + t.Run(exchange, func(t *testing.T) { + runStandardTests(t, exchange) + }) + } +} + +// TestPositionAccumulationBug tests that positions don't accumulate incorrectly +func TestPositionAccumulationBug(t *testing.T) { + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + positionStore := store.NewPositionStore(db) + if err := positionStore.InitTables(); err != nil { + t.Fatalf("Failed to initialize position tables: %v", err) + } + + posBuilder := store.NewPositionBuilder(positionStore) + + traderID := "test-trader" + exchangeID := "test-exchange" + exchangeType := "hyperliquid" + + // Simulate many trades that should cancel out + // This tests that we don't accumulate positions incorrectly + for i := 0; i < 10; i++ { + // Open Long + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + "ETHUSDT", "LONG", "open_long", + 0.1, 3500+float64(i*10), 0.5, 0, + time.Now().Add(time.Duration(i*2)*time.Second), + "", + ) + if err != nil { + t.Fatalf("Failed to open long %d: %v", i, err) + } + + // Close Long + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + "ETHUSDT", "LONG", "close_long", + 0.1, 3600+float64(i*10), 0.5, 10, + time.Now().Add(time.Duration(i*2+1)*time.Second), + "", + ) + if err != nil { + t.Fatalf("Failed to close long %d: %v", i, err) + } + } + + // Should have 0 open positions + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + + if len(positions) != 0 { + t.Errorf("Expected 0 positions after 10 open/close cycles, got %d", len(positions)) + for _, p := range positions { + t.Errorf(" Unexpected: %s %s qty=%.4f", p.Symbol, p.Side, p.Quantity) + } + } + + // Should have 10 closed positions with positive PnL + allPositions, err := positionStore.GetClosedPositions(traderID, 100) + if err != nil { + t.Fatalf("Failed to get closed positions: %v", err) + } + + closedCount := 0 + totalPnL := 0.0 + for _, p := range allPositions { + if p.Status == "CLOSED" { + closedCount++ + totalPnL += p.RealizedPnL + } + } + + if closedCount != 10 { + t.Errorf("Expected 10 closed positions, got %d", closedCount) + } + + if totalPnL <= 0 { + t.Errorf("Expected positive total PnL, got %.2f", totalPnL) + } +} + +// TestQuantityPrecision tests handling of quantity precision issues +func TestQuantityPrecision(t *testing.T) { + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + positionStore := store.NewPositionStore(db) + if err := positionStore.InitTables(); err != nil { + t.Fatalf("Failed to initialize position tables: %v", err) + } + + posBuilder := store.NewPositionBuilder(positionStore) + + traderID := "test-trader" + exchangeID := "test-exchange" + exchangeType := "test" + + // Open position + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + "BTCUSDT", "LONG", "open_long", + 0.01, 50000, 1.0, 0, + time.Now(), + "", + ) + if err != nil { + t.Fatalf("Failed to open: %v", err) + } + + // Close with slightly different quantity due to precision (0.00999999 vs 0.01) + // Should still close fully within tolerance + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + "BTCUSDT", "LONG", "close_long", + 0.00999999, 51000, 1.0, 10, + time.Now().Add(time.Second), + "", + ) + if err != nil { + t.Fatalf("Failed to close: %v", err) + } + + // Should have 0 open positions (within tolerance) + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + + if len(positions) != 0 { + t.Errorf("Expected 0 positions (precision tolerance), got %d", len(positions)) + } +} diff --git a/trader/hyperliquid_order_sync.go b/trader/hyperliquid_order_sync.go new file mode 100644 index 00000000..b072e1c5 --- /dev/null +++ b/trader/hyperliquid_order_sync.go @@ -0,0 +1,150 @@ +package trader + +import ( + "fmt" + "nofx/logger" + "nofx/store" + "sort" + "strings" + "time" +) + +// SyncOrdersFromHyperliquid syncs Hyperliquid exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("hyperliquid") +func (t *HyperliquidTrader) SyncOrdersFromHyperliquid(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Get recent trades (last 24 hours) + startTime := time.Now().Add(-24 * time.Hour) + + logger.Infof("🔄 Syncing Hyperliquid trades from: %s", startTime.Format(time.RFC3339)) + + // Use GetTrades method to fetch trade records + trades, err := t.GetTrades(startTime, 1000) + if err != nil { + return fmt.Errorf("failed to get trades: %w", err) + } + + logger.Infof("📥 Received %d trades from Hyperliquid", len(trades)) + + // Sort trades by time ASC (oldest first) for proper position building + sort.Slice(trades, func(i, j int) bool { + return trades[i].Time.Before(trades[j].Time) + }) + + // Process trades one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + syncedCount := 0 + + for _, trade := range trades { + // Check if trade already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID) + if err == nil && existing != nil { + continue // Order already exists, skip + } + + // Normalize symbol (add USDT suffix) + symbol := trade.Symbol + if symbol != "" && !strings.Contains(symbol, "USDT") && !strings.Contains(symbol, "USD") { + symbol = symbol + "USDT" + } + + // Use order action from trade (parsed from Hyperliquid Dir field) + // Dir field values: "Open Long", "Open Short", "Close Long", "Close Short" + orderAction := trade.OrderAction + positionSide := "LONG" + if strings.Contains(orderAction, "short") { + positionSide = "SHORT" + } + + // Create order record + orderRecord := &store.TraderOrder{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + ExchangeOrderID: trade.TradeID, + Symbol: symbol, + Side: trade.Side, + PositionSide: "BOTH", // Hyperliquid uses one-way position mode + Type: "MARKET", + OrderAction: orderAction, + Quantity: trade.Quantity, + Price: trade.Price, + Status: "FILLED", + FilledQuantity: trade.Quantity, + AvgFillPrice: trade.Price, + Commission: trade.Fee, + FilledAt: trade.Time, + CreatedAt: trade.Time, + UpdatedAt: trade.Time, + } + + // Insert order record + if err := orderStore.CreateOrder(orderRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err) + continue + } + + // Create fill record + fillRecord := &store.TraderFill{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + OrderID: orderRecord.ID, + ExchangeOrderID: trade.TradeID, + ExchangeTradeID: trade.TradeID, + Symbol: symbol, + Side: trade.Side, + Price: trade.Price, + Quantity: trade.Quantity, + QuoteQuantity: trade.Price * trade.Quantity, + Commission: trade.Fee, + CommissionAsset: "USDT", + RealizedPnL: trade.RealizedPnL, + IsMaker: false, // Hyperliquid GetTrades doesn't provide maker/taker info + CreatedAt: trade.Time, + } + + if err := orderStore.CreateFill(fillRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err) + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, positionSide, orderAction, + trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL, + trade.Time, trade.TradeID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err) + } else { + logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, orderAction, trade.Quantity) + } + + syncedCount++ + logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s", + trade.TradeID, symbol, trade.Side, trade.Quantity, trade.Price, trade.RealizedPnL, trade.Fee, orderAction) + } + + logger.Infof("✅ Order sync completed: %d new trades synced", syncedCount) + return nil +} + +// StartOrderSync starts background order sync task +func (t *HyperliquidTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := t.SyncOrdersFromHyperliquid(traderID, exchangeID, exchangeType, st); err != nil { + logger.Infof("⚠️ Hyperliquid order sync failed: %v", err) + } + } + }() + logger.Infof("🔄 Hyperliquid order sync started (interval: %v)", interval) +} diff --git a/trader/hyperliquid_sync_test.go b/trader/hyperliquid_sync_test.go new file mode 100644 index 00000000..065e37b0 --- /dev/null +++ b/trader/hyperliquid_sync_test.go @@ -0,0 +1,388 @@ +package trader + +import ( + "database/sql" + "nofx/store" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// TestHyperliquidOrderDirectionParsing tests Dir field parsing +func TestHyperliquidOrderDirectionParsing(t *testing.T) { + tests := []struct { + name string + dirField string + side string + expectedAction string + expectedPosSide string + }{ + { + name: "Open Long", + dirField: "Open Long", + side: "BUY", + expectedAction: "open_long", + expectedPosSide: "LONG", + }, + { + name: "Open Short", + dirField: "Open Short", + side: "SELL", + expectedAction: "open_short", + expectedPosSide: "SHORT", + }, + { + name: "Close Long", + dirField: "Close Long", + side: "SELL", + expectedAction: "close_long", + expectedPosSide: "LONG", + }, + { + name: "Close Short", + dirField: "Close Short", + side: "BUY", + expectedAction: "close_short", + expectedPosSide: "SHORT", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Mock fill data structure from Hyperliquid SDK + // We'll test the parsing logic directly + var orderAction string + switch tt.dirField { + case "Open Long": + orderAction = "open_long" + case "Open Short": + orderAction = "open_short" + case "Close Long": + orderAction = "close_long" + case "Close Short": + orderAction = "close_short" + } + + if orderAction != tt.expectedAction { + t.Errorf("Expected action %s, got %s", tt.expectedAction, orderAction) + } + }) + } +} + +// TestHyperliquidPositionBuilding tests the complete flow of position building +func TestHyperliquidPositionBuilding(t *testing.T) { + // Setup in-memory database + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + // Initialize stores + positionStore := store.NewPositionStore(db) + if err := positionStore.InitTables(); err != nil { + t.Fatalf("Failed to initialize position tables: %v", err) + } + + posBuilder := store.NewPositionBuilder(positionStore) + + traderID := "test-trader" + exchangeID := "test-exchange" + exchangeType := "hyperliquid" + symbol := "ETHUSDT" + + // Test Case 1: Open Long → Close Long (should result in 0 position) + t.Run("Open and Close Long", func(t *testing.T) { + // Open Long: BUY 0.1 ETH @ 3500 + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "open_long", + 0.1, 3500, 0.5, 0, + time.Now(), "order-1", + ) + if err != nil { + t.Fatalf("Failed to process open long: %v", err) + } + + // Verify position created + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 1 { + t.Fatalf("Expected 1 open position, got %d", len(positions)) + } + if positions[0].Quantity != 0.1 { + t.Errorf("Expected quantity 0.1, got %f", positions[0].Quantity) + } + + // Close Long: SELL 0.1 ETH @ 3600 + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "close_long", + 0.1, 3600, 0.5, 10.0, // PnL = (3600-3500)*0.1 = 10 + time.Now(), "order-2", + ) + if err != nil { + t.Fatalf("Failed to process close long: %v", err) + } + + // Verify position closed + positions, err = positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 0 { + t.Errorf("Expected 0 open positions, got %d", len(positions)) + } + }) + + // Clear positions for next test + db.Exec("DELETE FROM trader_positions") + + // Test Case 2: Open Short → Close Short with BUY (the bug scenario!) + t.Run("Open Short then Close with BUY", func(t *testing.T) { + // Open Short: SELL 0.05 ETH @ 3500 + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "SHORT", "open_short", + 0.05, 3500, 0.25, 0, + time.Now(), "order-3", + ) + if err != nil { + t.Fatalf("Failed to process open short: %v", err) + } + + // Verify SHORT position created + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 1 { + t.Fatalf("Expected 1 open position, got %d", len(positions)) + } + if positions[0].Side != "SHORT" { + t.Errorf("Expected SHORT position, got %s", positions[0].Side) + } + + // Close Short: BUY 0.05 ETH @ 3400 + // ⚠️ This is the critical test - BUY should close SHORT, not open LONG! + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "SHORT", "close_short", + 0.05, 3400, 0.25, 5.0, // PnL = (3500-3400)*0.05 = 5 + time.Now(), "order-4", + ) + if err != nil { + t.Fatalf("Failed to process close short: %v", err) + } + + // Verify position CLOSED (not opened a new LONG!) + positions, err = positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 0 { + t.Errorf("Expected 0 open positions after close, got %d", len(positions)) + if len(positions) > 0 { + t.Errorf("Wrong position side: %s (should be closed!)", positions[0].Side) + } + } + }) + + // Clear positions + db.Exec("DELETE FROM trader_positions") + + // Test Case 3: Position Averaging (Open → Add → Close) + t.Run("Position Averaging", func(t *testing.T) { + // Open Long: BUY 0.1 ETH @ 3500 + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "open_long", + 0.1, 3500, 0.5, 0, + time.Now(), "order-5", + ) + if err != nil { + t.Fatalf("Failed to process first open: %v", err) + } + + // Add to Long: BUY 0.1 ETH @ 3600 + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "open_long", + 0.1, 3600, 0.5, 0, + time.Now(), "order-6", + ) + if err != nil { + t.Fatalf("Failed to process add position: %v", err) + } + + // Verify averaged position + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 1 { + t.Fatalf("Expected 1 position (averaged), got %d", len(positions)) + } + if positions[0].Quantity != 0.2 { + t.Errorf("Expected quantity 0.2, got %f", positions[0].Quantity) + } + expectedAvgPrice := (3500*0.1 + 3600*0.1) / 0.2 // = 3550 + if positions[0].EntryPrice != expectedAvgPrice { + t.Errorf("Expected avg price %f, got %f", expectedAvgPrice, positions[0].EntryPrice) + } + + // Close all: SELL 0.2 ETH @ 3700 + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "close_long", + 0.2, 3700, 1.0, 30.0, + time.Now(), "order-7", + ) + if err != nil { + t.Fatalf("Failed to process close: %v", err) + } + + // Verify fully closed + positions, err = positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 0 { + t.Errorf("Expected 0 positions, got %d", len(positions)) + } + }) + + // Clear positions + db.Exec("DELETE FROM trader_positions") + + // Test Case 4: Partial Close + t.Run("Partial Close", func(t *testing.T) { + // Open Long: BUY 1.0 ETH @ 3500 + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "open_long", + 1.0, 3500, 2.0, 0, + time.Now(), "order-8", + ) + if err != nil { + t.Fatalf("Failed to process open: %v", err) + } + + // Partial Close: SELL 0.3 ETH @ 3600 + err = posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, "LONG", "close_long", + 0.3, 3600, 0.6, 30.0, + time.Now(), "order-9", + ) + if err != nil { + t.Fatalf("Failed to process partial close: %v", err) + } + + // Verify remaining position + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + if len(positions) != 1 { + t.Fatalf("Expected 1 position, got %d", len(positions)) + } + if positions[0].Quantity != 0.7 { + t.Errorf("Expected remaining quantity 0.7, got %f", positions[0].Quantity) + } + if positions[0].Status != "OPEN" { + t.Errorf("Expected status OPEN, got %s", positions[0].Status) + } + }) +} + +// TestHyperliquidBugScenario tests the exact bug we fixed +func TestHyperliquidBugScenario(t *testing.T) { + // Setup database + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + positionStore := store.NewPositionStore(db) + if err := positionStore.InitTables(); err != nil { + t.Fatalf("Failed to initialize position tables: %v", err) + } + + posBuilder := store.NewPositionBuilder(positionStore) + + traderID := "test-trader" + exchangeID := "test-exchange" + exchangeType := "hyperliquid" + + // Simulate the exact scenario from the bug report + // Account has 30 USDT, should not be able to hold 1.7 ETH + + trades := []struct { + action string + side string + symbol string + qty float64 + price float64 + fee float64 + pnl float64 + }{ + // Order 853: Open Short + {"open_short", "SHORT", "ETHUSDT", 0.0472, 3500, 0.2, 0}, + // Order 854: Close Short (was incorrectly classified as open_long) + {"close_short", "SHORT", "ETHUSDT", 0.0472, 3400, 0.2, 4.72}, + // Order 855: Open Long + {"open_long", "LONG", "ETHUSDT", 0.05, 3450, 0.2, 0}, + // Order 856: Close Long + {"close_long", "LONG", "ETHUSDT", 0.05, 3550, 0.2, 5.0}, + } + + for i, trade := range trades { + err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + trade.symbol, trade.side, trade.action, + trade.qty, trade.price, trade.fee, trade.pnl, + time.Now().Add(time.Duration(i)*time.Second), + "", + ) + if err != nil { + t.Fatalf("Failed to process trade %d: %v", i, err) + } + } + + // Verify: Should have 0 open positions + positions, err := positionStore.GetOpenPositions(traderID) + if err != nil { + t.Fatalf("Failed to get positions: %v", err) + } + + if len(positions) != 0 { + t.Errorf("Expected 0 open positions, got %d", len(positions)) + for _, p := range positions { + t.Errorf(" Unexpected position: %s %s qty=%.4f", p.Symbol, p.Side, p.Quantity) + } + } + + // Verify closed positions have correct PnL + allPositions, err := positionStore.GetClosedPositions(traderID, 100) + if err != nil { + t.Fatalf("Failed to get closed positions: %v", err) + } + + totalPnL := 0.0 + for _, p := range allPositions { + if p.Status == "CLOSED" { + totalPnL += p.RealizedPnL + } + } + + expectedTotalPnL := 4.72 + 5.0 // Sum of both close trades + if totalPnL != expectedTotalPnL { + t.Errorf("Expected total PnL %.2f, got %.2f", expectedTotalPnL, totalPnL) + } +} diff --git a/trader/hyperliquid_trader.go b/trader/hyperliquid_trader.go index 0aea5000..4ce86dec 100644 --- a/trader/hyperliquid_trader.go +++ b/trader/hyperliquid_trader.go @@ -1028,12 +1028,42 @@ func (t *HyperliquidTrader) GetTrades(startTime time.Time, limit int) ([]TradeRe side = "SELL" } + // Parse Dir field to get order action + // Hyperliquid Dir values: "Open Long", "Open Short", "Close Long", "Close Short" + var orderAction string + switch strings.ToLower(fill.Dir) { + case "open long": + orderAction = "open_long" + case "open short": + orderAction = "open_short" + case "close long": + orderAction = "close_long" + case "close short": + orderAction = "close_short" + default: + // Fallback: use RealizedPnL if Dir is missing/unknown + if pnl != 0 { + if side == "BUY" { + orderAction = "close_short" + } else { + orderAction = "close_long" + } + } else { + if side == "BUY" { + orderAction = "open_long" + } else { + orderAction = "open_short" + } + } + } + // Hyperliquid uses one-way mode, so PositionSide is "BOTH" trade := TradeRecord{ TradeID: strconv.FormatInt(fill.Tid, 10), Symbol: fill.Coin, Side: side, PositionSide: "BOTH", // Hyperliquid doesn't have hedge mode + OrderAction: orderAction, Price: price, Quantity: qty, RealizedPnL: pnl, diff --git a/trader/interface.go b/trader/interface.go index d9074184..a18e176e 100644 --- a/trader/interface.go +++ b/trader/interface.go @@ -26,6 +26,7 @@ type TradeRecord struct { Symbol string // Trading pair (e.g., "BTCUSDT") Side string // "BUY" or "SELL" PositionSide string // "LONG", "SHORT", or "BOTH" (for one-way mode) + OrderAction string // "open_long", "open_short", "close_long", "close_short" (from exchange Dir field) Price float64 // Execution price Quantity float64 // Executed quantity RealizedPnL float64 // Realized PnL (non-zero for closing trades) diff --git a/trader/lighter_order_sync.go b/trader/lighter_order_sync.go index 05fd5e73..caa468db 100644 --- a/trader/lighter_order_sync.go +++ b/trader/lighter_order_sync.go @@ -7,11 +7,12 @@ import ( "nofx/logger" "nofx/store" "net/http" + "sort" "strings" "time" ) -// LighterOrderHistory 订单历史记录 +// LighterOrderHistory order history record type LighterOrderHistory struct { OrderID string `json:"order_id"` Symbol string `json:"symbol"` @@ -26,16 +27,23 @@ type LighterOrderHistory struct { FilledAt int64 `json:"filled_at"` } -// SyncOrdersFromLighter 同步 Lighter 交易所的订单历史到本地数据库 -func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *store.OrderStore) error { - // 确保有 account index +// SyncOrdersFromLighter syncs Lighter exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("lighter") +func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Ensure we have account index if t.accountIndex == 0 { if err := t.initializeAccount(); err != nil { return fmt.Errorf("failed to get account index: %w", err) } } - // 获取最近的订单(过去24小时) + // Get recent orders (last 24 hours) startTime := time.Now().Add(-24 * time.Hour).Unix() endpoint := fmt.Sprintf("%s/api/v1/orders?account_index=%d&start_time=%d&limit=100", t.baseURL, t.accountIndex, startTime) @@ -47,7 +55,7 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto return fmt.Errorf("failed to create request: %w", err) } - // 添加认证头 + // Add authentication header if err := t.ensureAuthToken(); err != nil { return fmt.Errorf("failed to get auth token: %w", err) } @@ -72,7 +80,7 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto return fmt.Errorf("API returned status %d", resp.StatusCode) } - // 解析响应 + // Parse response var apiResp struct { Code int `json:"code"` Orders []LighterOrderHistory `json:"orders"` @@ -89,21 +97,33 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto logger.Infof("📥 Received %d orders from Lighter", len(apiResp.Orders)) - // 同步每个订单 + // Sort orders by filled_at ASC (oldest first) for proper position building + sort.Slice(apiResp.Orders, func(i, j int) bool { + return apiResp.Orders[i].FilledAt < apiResp.Orders[j].FilledAt + }) + + // Process orders one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + + // Get current open positions to help determine action for each order + openPositions, _ := positionStore.GetOpenPositions(traderID) + syncedCount := 0 for _, order := range apiResp.Orders { - // 只同步已成交的订单 + // Only sync filled orders if order.Status != "filled" { continue } - // 检查订单是否已存在 - existing, err := orderStore.GetOrderByExchangeID("lighter", order.OrderID) + // Check if order already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, order.OrderID) if err == nil && existing != nil { - continue // 订单已存在,跳过 + continue // Order already exists, skip } - // 解析价格和数量 + // Parse price and quantity price, _ := parseFloat(order.Price) size, _ := parseFloat(order.Size) filledSize, _ := parseFloat(order.FilledSize) @@ -112,24 +132,55 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto filledSize = size } - // 确定订单方向和动作 + // Determine order action based on existing positions + // Lighter can have both LONG and SHORT positions simultaneously var positionSide, orderAction, side string + symbol := order.Symbol + if order.Side == "buy" { side = "BUY" - // 买入可能是开多或平空,这里假设是开多 - positionSide = "LONG" - orderAction = "open_long" + + // Check if we have an open SHORT position for this symbol + hasShort := false + for _, pos := range openPositions { + if pos.Symbol == symbol && pos.Side == "SHORT" && pos.Status == "OPEN" { + hasShort = true + break + } + } + + if hasShort { + positionSide = "SHORT" + orderAction = "close_short" + } else { + positionSide = "LONG" + orderAction = "open_long" + } } else { side = "SELL" - // 卖出可能是平多或开空,这里假设是平多 - positionSide = "LONG" - orderAction = "close_long" + + // Check if we have an open LONG position + hasLong := false + for _, pos := range openPositions { + if pos.Symbol == symbol && pos.Side == "LONG" && pos.Status == "OPEN" { + hasLong = true + break + } + } + + if hasLong { + positionSide = "LONG" + orderAction = "close_long" + } else { + positionSide = "SHORT" + orderAction = "open_short" + } } - // 估算手续费 + // Estimate fee fee := price * filledSize * 0.0004 - // 创建订单记录 + // Create order record filledAt := time.Unix(order.FilledAt, 0) if order.FilledAt == 0 { filledAt = time.Unix(order.UpdatedAt, 0) @@ -137,9 +188,10 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto orderRecord := &store.TraderOrder{ TraderID: traderID, - ExchangeID: "lighter", + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type ExchangeOrderID: order.OrderID, - Symbol: order.Symbol, + Symbol: symbol, Side: side, PositionSide: positionSide, Type: "MARKET", @@ -155,20 +207,21 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto UpdatedAt: time.Unix(order.UpdatedAt, 0), } - // 插入订单记录 + // Insert order record if err := orderStore.CreateOrder(orderRecord); err != nil { logger.Infof(" ⚠️ Failed to sync order %s: %v", order.OrderID, err) continue } - // 创建成交记录 + // Create fill record fillRecord := &store.TraderFill{ TraderID: traderID, - ExchangeID: "lighter", + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type OrderID: orderRecord.ID, ExchangeOrderID: order.OrderID, ExchangeTradeID: fmt.Sprintf("%s-%d", order.OrderID, time.Now().UnixNano()), - Symbol: order.Symbol, + Symbol: symbol, Side: side, Price: price, Quantity: filledSize, @@ -184,20 +237,63 @@ func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, orderStore *sto logger.Infof(" ⚠️ Failed to sync fill for order %s: %v", order.OrderID, err) } + // Calculate PnL for close orders + var realizedPnL float64 + if strings.HasPrefix(orderAction, "close_") { + // Get the open position to calculate PnL + openPos, _ := positionStore.GetOpenPositionBySymbol(traderID, symbol, positionSide) + if openPos != nil { + if positionSide == "LONG" { + realizedPnL = (price - openPos.EntryPrice) * filledSize + } else { + realizedPnL = (openPos.EntryPrice - price) * filledSize + } + realizedPnL -= fee + } + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + symbol, positionSide, orderAction, + filledSize, price, fee, realizedPnL, + filledAt, order.OrderID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for order %s: %v", order.OrderID, err) + } + + // Update openPositions list dynamically + if strings.HasPrefix(orderAction, "open_") { + // Add to openPositions (approximate) + openPositions = append(openPositions, &store.TraderPosition{ + Symbol: symbol, + Side: positionSide, + Status: "OPEN", + }) + } else if strings.HasPrefix(orderAction, "close_") { + // Remove from openPositions (approximate) + for i, pos := range openPositions { + if pos.Symbol == symbol && pos.Side == positionSide && pos.Status == "OPEN" { + openPositions = append(openPositions[:i], openPositions[i+1:]...) + break + } + } + } + syncedCount++ - logger.Infof(" ✅ Synced order: %s %s %s qty=%.6f price=%.6f", order.OrderID, order.Symbol, side, filledSize, price) + logger.Infof(" ✅ Synced order: %s %s %s qty=%.6f price=%.6f", order.OrderID, symbol, side, filledSize, price) } logger.Infof("✅ Order sync completed: %d new orders synced", syncedCount) return nil } -// StartOrderSync 启动订单同步后台任务 -func (t *LighterTraderV2) StartOrderSync(traderID string, orderStore *store.OrderStore, interval time.Duration) { +// StartOrderSync starts background order sync task +func (t *LighterTraderV2) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { ticker := time.NewTicker(interval) go func() { for range ticker.C { - if err := t.SyncOrdersFromLighter(traderID, orderStore); err != nil { + if err := t.SyncOrdersFromLighter(traderID, exchangeID, exchangeType, st); err != nil { // Only log non-404 errors to reduce log spam if !strings.Contains(err.Error(), "status 404") { logger.Infof("⚠️ Order sync failed: %v", err) @@ -205,5 +301,5 @@ func (t *LighterTraderV2) StartOrderSync(traderID string, orderStore *store.Orde } } }() - logger.Infof("🔄 Lighter order sync started (interval: %v)", interval) + logger.Infof("🔄 Lighter order+position sync started (interval: %v)", interval) } diff --git a/trader/okx_order_sync.go b/trader/okx_order_sync.go new file mode 100644 index 00000000..9e45b685 --- /dev/null +++ b/trader/okx_order_sync.go @@ -0,0 +1,280 @@ +package trader + +import ( + "encoding/json" + "fmt" + "nofx/logger" + "nofx/store" + "sort" + "strconv" + "strings" + "time" +) + +// OKXTrade represents a trade record from OKX fills history +type OKXTrade struct { + InstID string + Symbol string + TradeID string + OrderID string + Side string // buy or sell + PosSide string // long or short + FillPrice float64 + FillQty float64 // In contracts + FillQtyBase float64 // In base asset (BTC, ETH, etc) + Fee float64 + FeeAsset string + ExecTime time.Time + IsMaker bool + OrderType string + OrderAction string // open_long, open_short, close_long, close_short +} + +// GetTrades retrieves trade/fill records from OKX +func (t *OKXTrader) GetTrades(startTime time.Time, limit int) ([]OKXTrade, error) { + if limit <= 0 { + limit = 100 + } + if limit > 100 { + limit = 100 // OKX max limit is 100 + } + + // Build query path + // OKX fills-history endpoint for historical fills + path := fmt.Sprintf("/api/v5/trade/fills-history?instType=SWAP&limit=%d", limit) + if !startTime.IsZero() { + path += fmt.Sprintf("&begin=%d", startTime.UnixMilli()) + } + + data, err := t.doRequest("GET", path, nil) + if err != nil { + return nil, fmt.Errorf("failed to get fills history: %w", err) + } + + var fills []struct { + InstID string `json:"instId"` // e.g., "BTC-USDT-SWAP" + TradeID string `json:"tradeId"` // Trade ID + OrdID string `json:"ordId"` // Order ID + BillID string `json:"billId"` // Bill ID + Side string `json:"side"` // buy or sell + PosSide string `json:"posSide"` // long, short, or net + FillPx string `json:"fillPx"` // Fill price + FillSz string `json:"fillSz"` // Fill size (contracts) + Fee string `json:"fee"` // Fee (negative for cost) + FeeCcy string `json:"feeCcy"` // Fee currency + Ts string `json:"ts"` // Trade timestamp (ms) + ExecType string `json:"execType"` // T: taker, M: maker + Tag string `json:"tag"` // Order tag + } + + if err := json.Unmarshal(data, &fills); err != nil { + return nil, fmt.Errorf("failed to parse fills: %w", err) + } + + trades := make([]OKXTrade, 0, len(fills)) + + for _, fill := range fills { + fillPrice, _ := strconv.ParseFloat(fill.FillPx, 64) + fillSz, _ := strconv.ParseFloat(fill.FillSz, 64) + fee, _ := strconv.ParseFloat(fill.Fee, 64) + ts, _ := strconv.ParseInt(fill.Ts, 10, 64) + + // Convert symbol: BTC-USDT-SWAP -> BTCUSDT + symbol := t.convertSymbolBack(fill.InstID) + + // Convert contract count to base asset quantity + fillQtyBase := fillSz + inst, err := t.getInstrument(symbol) + if err == nil && inst.CtVal > 0 { + fillQtyBase = fillSz * inst.CtVal + } + + // Determine order action based on side and posSide + // OKX uses dual position mode: + // - buy + long = open long + // - sell + long = close long + // - sell + short = open short + // - buy + short = close short + orderAction := "open_long" + posSide := strings.ToLower(fill.PosSide) + side := strings.ToLower(fill.Side) + + if posSide == "long" { + if side == "buy" { + orderAction = "open_long" + } else { + orderAction = "close_long" + } + } else if posSide == "short" { + if side == "sell" { + orderAction = "open_short" + } else { + orderAction = "close_short" + } + } else { + // One-way mode (net position) + if side == "buy" { + orderAction = "open_long" + } else { + orderAction = "open_short" + } + } + + trade := OKXTrade{ + InstID: fill.InstID, + Symbol: symbol, + TradeID: fill.TradeID, + OrderID: fill.OrdID, + Side: fill.Side, + PosSide: fill.PosSide, + FillPrice: fillPrice, + FillQty: fillSz, + FillQtyBase: fillQtyBase, + Fee: -fee, // OKX returns negative fee + FeeAsset: fill.FeeCcy, + ExecTime: time.UnixMilli(ts), + IsMaker: fill.ExecType == "M", + OrderType: "MARKET", + OrderAction: orderAction, + } + + trades = append(trades, trade) + } + + return trades, nil +} + +// SyncOrdersFromOKX syncs OKX exchange order history to local database +// Also creates/updates position records to ensure orders/fills/positions data consistency +// exchangeID: Exchange account UUID (from exchanges.id) +// exchangeType: Exchange type ("okx") +func (t *OKXTrader) SyncOrdersFromOKX(traderID string, exchangeID string, exchangeType string, st *store.Store) error { + if st == nil { + return fmt.Errorf("store is nil") + } + + // Get recent trades (last 24 hours) + startTime := time.Now().Add(-24 * time.Hour) + + logger.Infof("🔄 Syncing OKX trades from: %s", startTime.Format(time.RFC3339)) + + // Use GetTrades method to fetch trade records + trades, err := t.GetTrades(startTime, 100) + if err != nil { + return fmt.Errorf("failed to get trades: %w", err) + } + + logger.Infof("📥 Received %d trades from OKX", len(trades)) + + // Sort trades by time ASC (oldest first) for proper position building + sort.Slice(trades, func(i, j int) bool { + return trades[i].ExecTime.Before(trades[j].ExecTime) + }) + + // Process trades one by one (no transaction to avoid deadlock) + orderStore := st.Order() + positionStore := st.Position() + posBuilder := store.NewPositionBuilder(positionStore) + syncedCount := 0 + + for _, trade := range trades { + // Check if trade already exists (use exchangeID which is UUID, not exchange type) + existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID) + if err == nil && existing != nil { + continue // Order already exists, skip + } + + // Determine position side from order action + positionSide := "LONG" + if strings.Contains(trade.OrderAction, "short") { + positionSide = "SHORT" + } + + // Normalize side for storage + side := strings.ToUpper(trade.Side) + + // Create order record + orderRecord := &store.TraderOrder{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + ExchangeOrderID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + PositionSide: positionSide, + Type: trade.OrderType, + OrderAction: trade.OrderAction, + Quantity: trade.FillQtyBase, + Price: trade.FillPrice, + Status: "FILLED", + FilledQuantity: trade.FillQtyBase, + AvgFillPrice: trade.FillPrice, + Commission: trade.Fee, + FilledAt: trade.ExecTime, + CreatedAt: trade.ExecTime, + UpdatedAt: trade.ExecTime, + } + + // Insert order record + if err := orderStore.CreateOrder(orderRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err) + continue + } + + // Create fill record + fillRecord := &store.TraderFill{ + TraderID: traderID, + ExchangeID: exchangeID, // UUID + ExchangeType: exchangeType, // Exchange type + OrderID: orderRecord.ID, + ExchangeOrderID: trade.OrderID, + ExchangeTradeID: trade.TradeID, + Symbol: trade.Symbol, + Side: side, + Price: trade.FillPrice, + Quantity: trade.FillQtyBase, + QuoteQuantity: trade.FillPrice * trade.FillQtyBase, + Commission: trade.Fee, + CommissionAsset: trade.FeeAsset, + RealizedPnL: 0, // OKX fills don't include PnL per trade + IsMaker: trade.IsMaker, + CreatedAt: trade.ExecTime, + } + + if err := orderStore.CreateFill(fillRecord); err != nil { + logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err) + } + + // Create/update position record using PositionBuilder + if err := posBuilder.ProcessTrade( + traderID, exchangeID, exchangeType, + trade.Symbol, positionSide, trade.OrderAction, + trade.FillQtyBase, trade.FillPrice, trade.Fee, 0, // No per-trade PnL from OKX + trade.ExecTime, trade.TradeID, + ); err != nil { + logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err) + } else { + logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, trade.OrderAction, trade.FillQtyBase) + } + + syncedCount++ + logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f fee=%.6f action=%s", + trade.TradeID, trade.Symbol, side, trade.FillQtyBase, trade.FillPrice, trade.Fee, trade.OrderAction) + } + + logger.Infof("✅ OKX order sync completed: %d new trades synced", syncedCount) + return nil +} + +// StartOrderSync starts background order sync task for OKX +func (t *OKXTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := t.SyncOrdersFromOKX(traderID, exchangeID, exchangeType, st); err != nil { + logger.Infof("⚠️ OKX order sync failed: %v", err) + } + } + }() + logger.Infof("🔄 OKX order sync started (interval: %v)", interval) +} diff --git a/trader/position_snapshot.go b/trader/position_snapshot.go new file mode 100644 index 00000000..5149b900 --- /dev/null +++ b/trader/position_snapshot.go @@ -0,0 +1,101 @@ +package trader + +import ( + "fmt" + "nofx/logger" + "nofx/store" + "time" +) + +// CreatePositionSnapshot gets current real positions from exchange and creates snapshot positions +// This function will: +// 1. Delete all OPEN old positions from database +// 2. Get current real positions from exchange +// 3. Create a "snapshot" record for each real position +func CreatePositionSnapshot(traderID, exchangeID, exchangeType string, trader Trader, st *store.Store) error { + logger.Infof("📸 Creating position snapshot for trader %s (%s)...", traderID, exchangeType) + + positionStore := st.Position() + + // Step 1: Delete all OPEN positions + logger.Infof("🗑️ Deleting all OPEN positions from database...") + if err := positionStore.DeleteAllOpenPositions(traderID); err != nil { + return fmt.Errorf("failed to delete open positions: %w", err) + } + logger.Infof("✅ Deleted all OPEN positions") + + // Step 2: Get current positions from exchange + logger.Infof("📡 Fetching current positions from exchange...") + positions, err := trader.GetPositions() + if err != nil { + return fmt.Errorf("failed to get positions from exchange: %w", err) + } + + if len(positions) == 0 { + logger.Infof("✅ No open positions on exchange, snapshot complete") + return nil + } + + logger.Infof("📥 Found %d positions on exchange", len(positions)) + + // Step 3: Create snapshot record for each position + now := time.Now() + createdCount := 0 + + for _, posMap := range positions { + // Parse position data + symbol, _ := posMap["symbol"].(string) + sideStr, _ := posMap["side"].(string) + positionAmt, _ := posMap["positionAmt"].(float64) + entryPrice, _ := posMap["entryPrice"].(float64) + markPrice, _ := posMap["markPrice"].(float64) + leverage, _ := posMap["leverage"].(float64) + + // Skip positions with 0 quantity + if positionAmt == 0 { + continue + } + + // Determine position side + side := "LONG" + if sideStr == "short" { + side = "SHORT" + } + + // Use current mark price as entry price (approximation) + // If entryPrice is 0, use markPrice + if entryPrice == 0 { + entryPrice = markPrice + } + + snapshotPosition := &store.TraderPosition{ + TraderID: traderID, + ExchangeID: exchangeID, + ExchangeType: exchangeType, + ExchangePositionID: fmt.Sprintf("snapshot_%s_%s_%d", symbol, side, now.UnixMilli()), + Symbol: symbol, + Side: side, + Quantity: positionAmt, + EntryPrice: entryPrice, + EntryOrderID: "snapshot", // Mark as snapshot + EntryTime: now, + Leverage: int(leverage), + Status: "OPEN", + Source: "snapshot", // Mark source as snapshot + CreatedAt: now, + UpdatedAt: now, + } + + if err := positionStore.CreateOpenPosition(snapshotPosition); err != nil { + logger.Infof(" ⚠️ Failed to create snapshot position for %s %s: %v", symbol, side, err) + continue + } + + logger.Infof(" ✅ Created snapshot: %s %s %.6f @ %.2f (leverage: %dx)", + symbol, side, positionAmt, entryPrice, int(leverage)) + createdCount++ + } + + logger.Infof("✅ Position snapshot complete: %d positions created", createdCount) + return nil +} diff --git a/trader/position_sync.go b/trader/position_sync.go deleted file mode 100644 index 6f8a2489..00000000 --- a/trader/position_sync.go +++ /dev/null @@ -1,809 +0,0 @@ -package trader - -import ( - "fmt" - "nofx/logger" - "nofx/store" - "strings" - "sync" - "time" -) - -// PositionSyncManager Position status synchronization manager -// Responsible for periodically synchronizing exchange positions, detecting manual closures and other changes -type PositionSyncManager struct { - store *store.Store - interval time.Duration - historySyncInterval time.Duration // Interval for full history sync - stopCh chan struct{} - wg sync.WaitGroup - traderCache map[string]Trader // trader_id -> Trader instance cache - configCache map[string]*store.TraderFullConfig // trader_id -> config cache - cacheMutex sync.RWMutex - lastHistorySync map[string]time.Time // trader_id -> last history sync time - lastHistorySyncMutex sync.RWMutex -} - -// NewPositionSyncManager Create position synchronization manager -func NewPositionSyncManager(st *store.Store, interval time.Duration) *PositionSyncManager { - if interval == 0 { - interval = 10 * time.Second - } - return &PositionSyncManager{ - store: st, - interval: interval, - historySyncInterval: 5 * time.Minute, // Sync closed positions every 5 minutes - stopCh: make(chan struct{}), - traderCache: make(map[string]Trader), - configCache: make(map[string]*store.TraderFullConfig), - lastHistorySync: make(map[string]time.Time), - } -} - -// Start Start position synchronization service -func (m *PositionSyncManager) Start() { - m.wg.Add(1) - go m.run() - logger.Info("📊 Position sync manager started") - - // Run startup sync in background - go m.startupSync() -} - -// Stop Stop position synchronization service -func (m *PositionSyncManager) Stop() { - close(m.stopCh) - m.wg.Wait() - - // Clear cache - m.cacheMutex.Lock() - m.traderCache = make(map[string]Trader) - m.configCache = make(map[string]*store.TraderFullConfig) - m.cacheMutex.Unlock() - - logger.Info("📊 Position sync manager stopped") -} - -// run Main loop -func (m *PositionSyncManager) run() { - defer m.wg.Done() - - // Execute immediately on startup - m.syncPositions() - - ticker := time.NewTicker(m.interval) - defer ticker.Stop() - - for { - select { - case <-m.stopCh: - return - case <-ticker.C: - m.syncPositions() - } - } -} - -// syncPositions Synchronize all position statuses -func (m *PositionSyncManager) syncPositions() { - // Get all OPEN status positions - localPositions, err := m.store.Position().GetAllOpenPositions() - if err != nil { - logger.Infof("⚠️ Failed to get local positions: %v", err) - return - } - - if len(localPositions) == 0 { - return - } - - // Group by trader_id - positionsByTrader := make(map[string][]*store.TraderPosition) - for _, pos := range localPositions { - positionsByTrader[pos.TraderID] = append(positionsByTrader[pos.TraderID], pos) - } - - // Process each trader - for traderID, traderPositions := range positionsByTrader { - m.syncTraderPositions(traderID, traderPositions) - } -} - -// syncTraderPositions Synchronize positions for a single trader -func (m *PositionSyncManager) syncTraderPositions(traderID string, localPositions []*store.TraderPosition) { - // Get or create trader instance - trader, err := m.getOrCreateTrader(traderID) - if err != nil { - logger.Infof("⚠️ Failed to get trader instance (ID: %s): %v", traderID, err) - return - } - - // Get exchange info for history sync - config, _ := m.getTraderConfig(traderID) - exchangeID := "" - exchangeType := "" - if config != nil { - exchangeID = config.Exchange.ID // UUID for database association - exchangeType = config.Exchange.ExchangeType // "binance", "bybit" etc for trader creation - } - - // Maybe run periodic history sync - if exchangeID != "" && exchangeType != "" { - m.maybeRunHistorySync(traderID, exchangeID, exchangeType, trader) - } - - // Get current exchange positions - exchangePositions, err := trader.GetPositions() - if err != nil { - logger.Infof("⚠️ Failed to get exchange positions (ID: %s): %v", traderID, err) - return - } - - // Build exchange position map: symbol_side -> position - // Note: Exchange returns side as "long"/"short" (lowercase), database stores "LONG"/"SHORT" (uppercase) - exchangeMap := make(map[string]map[string]interface{}) - for _, pos := range exchangePositions { - symbol, _ := pos["symbol"].(string) - side, _ := pos["side"].(string) // Note: use "side" not "positionSide" - if symbol == "" || side == "" { - continue - } - // Normalize side to uppercase for matching with database - normalizedSide := strings.ToUpper(side) - key := fmt.Sprintf("%s_%s", symbol, normalizedSide) - exchangeMap[key] = pos - } - - // Compare local and exchange positions - for _, localPos := range localPositions { - key := fmt.Sprintf("%s_%s", localPos.Symbol, localPos.Side) - exchangePos, exists := exchangeMap[key] - - if !exists { - // Exchange doesn't have this position → it has been closed - m.closeLocalPosition(localPos, trader, "manual") - continue - } - - // Check if quantity is 0 or very small - qty := getFloatFromMap(exchangePos, "positionAmt") - if qty < 0 { - qty = -qty // Short position quantity is negative - } - - if qty < 0.0000001 { - // Quantity is 0, position closed - m.closeLocalPosition(localPos, trader, "manual") - } - } -} - -// closeLocalPosition Mark local position as closed -func (m *PositionSyncManager) closeLocalPosition(pos *store.TraderPosition, trader Trader, reason string) { - // Try to get accurate closure data from exchange first - closedPnLRecord := m.findClosedPnLRecord(trader, pos) - - var exitPrice, realizedPnL, fee float64 - var closeReason, exitOrderID string - - if closedPnLRecord != nil { - // Use accurate data from exchange - exitPrice = closedPnLRecord.ExitPrice - realizedPnL = closedPnLRecord.RealizedPnL - fee = closedPnLRecord.Fee - closeReason = closedPnLRecord.CloseType - exitOrderID = closedPnLRecord.OrderID - logger.Infof("📊 Found accurate closure data from exchange for %s %s", pos.Symbol, pos.Side) - } else { - // Fallback: use market price and calculate PnL - exitPrice = pos.EntryPrice // Default to entry price - if price, err := trader.GetMarketPrice(pos.Symbol); err == nil && price > 0 { - exitPrice = price - } - - // Calculate PnL - if pos.Side == "LONG" { - realizedPnL = (exitPrice - pos.EntryPrice) * pos.Quantity - } else { - realizedPnL = (pos.EntryPrice - exitPrice) * pos.Quantity - } - closeReason = reason - fee = 0 - exitOrderID = "" - logger.Infof("⚠️ Using market price for closure (no exchange data): %s %s", pos.Symbol, pos.Side) - } - - // Update database - err := m.store.Position().ClosePosition( - pos.ID, - exitPrice, - exitOrderID, - realizedPnL, - fee, - closeReason, - ) - - if err != nil { - logger.Infof("⚠️ Failed to update position status: %v", err) - } else { - logger.Infof("📊 Position closed [%s] %s %s @ %.4f → %.4f, PnL: %.2f, Fee: %.4f (%s)", - pos.TraderID[:8], pos.Symbol, pos.Side, pos.EntryPrice, exitPrice, realizedPnL, fee, closeReason) - } -} - -// findClosedPnLRecord Try to find matching ClosedPnL record from exchange -// For Binance, directly query trades for the specific symbol (more reliable than Income API) -func (m *PositionSyncManager) findClosedPnLRecord(trader Trader, pos *store.TraderPosition) *ClosedPnLRecord { - // Try to get trades directly for this symbol (Binance-specific, more reliable) - if binanceTrader, ok := trader.(*FuturesTrader); ok { - return m.findClosedPnLFromBinanceTrades(binanceTrader, pos) - } - - // Fallback: use GetClosedPnL for other exchanges - startTime := time.Now().Add(-24 * time.Hour) - records, err := trader.GetClosedPnL(startTime, 100) - if err != nil { - logger.Infof("⚠️ Failed to get closed PnL records: %v", err) - return nil - } - - return m.aggregateClosedRecords(records, pos) -} - -// findClosedPnLFromBinanceTrades queries Binance directly for trades of a specific symbol -func (m *PositionSyncManager) findClosedPnLFromBinanceTrades(trader *FuturesTrader, pos *store.TraderPosition) *ClosedPnLRecord { - // Query trades for this specific symbol from the last hour - startTime := time.Now().Add(-1 * time.Hour) - trades, err := trader.GetTradesForSymbol(pos.Symbol, startTime, 100) - if err != nil { - logger.Infof("⚠️ Failed to get trades for %s: %v", pos.Symbol, err) - return nil - } - - if len(trades) == 0 { - logger.Infof("⚠️ No trades found for %s in the last hour", pos.Symbol) - return nil - } - - // Find all closing trades (realizedPnl != 0) that match this position - var totalQty, totalPnL, totalFee float64 - var weightedExitPrice float64 - var latestExitTime time.Time - var latestTradeID string - matchCount := 0 - - posSide := strings.ToLower(pos.Side) - - for _, trade := range trades { - // Skip opening trades - if trade.RealizedPnL == 0 { - continue - } - - // Determine if this trade closes our position - // For LONG position: SELL closes it - // For SHORT position: BUY closes it - isClosingTrade := false - tradeSide := strings.ToUpper(trade.Side) - positionSide := strings.ToUpper(trade.PositionSide) - - if positionSide == "LONG" && posSide == "long" { - isClosingTrade = true - } else if positionSide == "SHORT" && posSide == "short" { - isClosingTrade = true - } else if positionSide == "BOTH" || positionSide == "" { - // One-way mode - if tradeSide == "SELL" && posSide == "long" { - isClosingTrade = true - } else if tradeSide == "BUY" && posSide == "short" { - isClosingTrade = true - } - } - - if !isClosingTrade { - continue - } - - // Aggregate this trade - totalQty += trade.Quantity - totalPnL += trade.RealizedPnL - totalFee += trade.Fee - weightedExitPrice += trade.Price * trade.Quantity - matchCount++ - - if trade.Time.After(latestExitTime) { - latestExitTime = trade.Time - latestTradeID = trade.TradeID - } - } - - if matchCount == 0 { - logger.Infof("⚠️ No closing trades found for %s %s", pos.Symbol, pos.Side) - return nil - } - - avgExitPrice := weightedExitPrice / totalQty - - logger.Infof("📊 Found %d closing trades for %s %s: qty=%.4f, exitPrice=%.6f, pnl=%.4f, fee=%.4f", - matchCount, pos.Symbol, pos.Side, totalQty, avgExitPrice, totalPnL, totalFee) - - return &ClosedPnLRecord{ - Symbol: pos.Symbol, - Side: posSide, - EntryPrice: pos.EntryPrice, - ExitPrice: avgExitPrice, - Quantity: totalQty, - RealizedPnL: totalPnL, - Fee: totalFee, - ExitTime: latestExitTime, - EntryTime: pos.EntryTime, - OrderID: latestTradeID, - ExchangeID: latestTradeID, - CloseType: "unknown", - } -} - -// aggregateClosedRecords aggregates closed PnL records for a position -func (m *PositionSyncManager) aggregateClosedRecords(records []ClosedPnLRecord, pos *store.TraderPosition) *ClosedPnLRecord { - if len(records) == 0 { - return nil - } - - posSide := strings.ToLower(pos.Side) - var matchingRecords []ClosedPnLRecord - - for i := range records { - record := &records[i] - if record.Symbol != pos.Symbol { - continue - } - - recordSide := strings.ToLower(record.Side) - if recordSide != posSide { - continue - } - - matchingRecords = append(matchingRecords, *record) - } - - if len(matchingRecords) == 0 { - return nil - } - - var totalQty, totalPnL, totalFee float64 - var weightedExitPrice float64 - var latestExitTime time.Time - var latestOrderID, latestExchangeID string - - for _, rec := range matchingRecords { - totalQty += rec.Quantity - totalPnL += rec.RealizedPnL - totalFee += rec.Fee - weightedExitPrice += rec.ExitPrice * rec.Quantity - - if rec.ExitTime.After(latestExitTime) { - latestExitTime = rec.ExitTime - latestOrderID = rec.OrderID - latestExchangeID = rec.ExchangeID - } - } - - avgExitPrice := weightedExitPrice / totalQty - - logger.Infof("📊 Aggregated %d closing trades for %s %s: qty=%.4f, pnl=%.4f, fee=%.4f", - len(matchingRecords), pos.Symbol, pos.Side, totalQty, totalPnL, totalFee) - - return &ClosedPnLRecord{ - Symbol: pos.Symbol, - Side: posSide, - EntryPrice: pos.EntryPrice, - ExitPrice: avgExitPrice, - Quantity: totalQty, - RealizedPnL: totalPnL, - Fee: totalFee, - ExitTime: latestExitTime, - EntryTime: pos.EntryTime, - OrderID: latestOrderID, - ExchangeID: latestExchangeID, - CloseType: "unknown", - } -} - -// abs returns absolute value of float64 -func abs(x float64) float64 { - if x < 0 { - return -x - } - return x -} - -// getOrCreateTrader Get or create trader instance -func (m *PositionSyncManager) getOrCreateTrader(traderID string) (Trader, error) { - m.cacheMutex.RLock() - trader, exists := m.traderCache[traderID] - m.cacheMutex.RUnlock() - - if exists && trader != nil { - return trader, nil - } - - // Need to create new trader instance - config, err := m.getTraderConfig(traderID) - if err != nil { - return nil, fmt.Errorf("failed to get trader config: %w", err) - } - - trader, err = m.createTrader(config) - if err != nil { - return nil, fmt.Errorf("failed to create trader instance: %w", err) - } - - m.cacheMutex.Lock() - m.traderCache[traderID] = trader - m.cacheMutex.Unlock() - - return trader, nil -} - -// getTraderConfig Get trader configuration -func (m *PositionSyncManager) getTraderConfig(traderID string) (*store.TraderFullConfig, error) { - m.cacheMutex.RLock() - config, exists := m.configCache[traderID] - m.cacheMutex.RUnlock() - - if exists { - return config, nil - } - - // Get from database - traders, err := m.store.Trader().ListAll() - if err != nil { - return nil, fmt.Errorf("failed to get trader list: %w", err) - } - - var userID string - for _, t := range traders { - if t.ID == traderID { - userID = t.UserID - break - } - } - - if userID == "" { - return nil, fmt.Errorf("trader not found: %s", traderID) - } - - config, err = m.store.Trader().GetFullConfig(userID, traderID) - if err != nil { - return nil, err - } - - m.cacheMutex.Lock() - m.configCache[traderID] = config - m.cacheMutex.Unlock() - - return config, nil -} - -// createTrader Create trader instance based on configuration -func (m *PositionSyncManager) createTrader(config *store.TraderFullConfig) (Trader, error) { - exchange := config.Exchange - - // Use exchange.ExchangeType to determine specific exchange, not exchange.ID (UUID) or exchange.Type (cex/dex) - switch exchange.ExchangeType { - case "binance": - return NewFuturesTrader(exchange.APIKey, exchange.SecretKey, config.Trader.UserID), nil - - case "bybit": - return NewBybitTrader(exchange.APIKey, exchange.SecretKey), nil - - case "okx": - return NewOKXTrader(exchange.APIKey, exchange.SecretKey, exchange.Passphrase), nil - - case "bitget": - return NewBitgetTrader(exchange.APIKey, exchange.SecretKey, exchange.Passphrase), nil - - case "hyperliquid": - return NewHyperliquidTrader(exchange.SecretKey, exchange.HyperliquidWalletAddr, exchange.Testnet) - - case "aster": - return NewAsterTrader(exchange.AsterUser, exchange.AsterSigner, exchange.AsterPrivateKey) - - case "lighter": - if exchange.LighterWalletAddr == "" || exchange.LighterAPIKeyPrivateKey == "" { - return nil, fmt.Errorf("Lighter requires wallet address and API Key private key") - } - // Lighter only supports mainnet - return NewLighterTraderV2( - exchange.LighterWalletAddr, - exchange.LighterAPIKeyPrivateKey, - exchange.LighterAPIKeyIndex, - false, // Always use mainnet for Lighter - ) - - default: - return nil, fmt.Errorf("unsupported exchange type: %s", exchange.ExchangeType) - } -} - -// InvalidateCache Invalidate cache -func (m *PositionSyncManager) InvalidateCache(traderID string) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - - delete(m.traderCache, traderID) - delete(m.configCache, traderID) -} - -// getFloatFromMap Get float64 value from map -func getFloatFromMap(m map[string]interface{}, key string) float64 { - if v, ok := m[key]; ok { - switch val := v.(type) { - case float64: - return val - case int64: - return float64(val) - case int: - return float64(val) - case string: - var f float64 - fmt.Sscanf(val, "%f", &f) - return f - } - } - return 0 -} - -// ============================================================================= -// Startup and History Sync Methods -// ============================================================================= - -// startupSync performs initial sync on startup -// 1. Sync existing positions from exchange (to detect external positions) -// 2. Sync closed positions history from exchange -func (m *PositionSyncManager) startupSync() { - logger.Info("📊 Starting startup sync...") - - // Get all traders - traders, err := m.store.Trader().ListAll() - if err != nil { - logger.Infof("⚠️ Failed to get traders for startup sync: %v", err) - return - } - - for _, traderInfo := range traders { - traderID := traderInfo.ID - - // Get trader instance - trader, err := m.getOrCreateTrader(traderID) - if err != nil { - logger.Infof("⚠️ Failed to get trader instance for startup sync (ID: %s): %v", traderID, err) - continue - } - - // Get exchange info - config, err := m.getTraderConfig(traderID) - if err != nil { - logger.Infof("⚠️ Failed to get trader config for startup sync (ID: %s): %v", traderID, err) - continue - } - exchangeID := config.Exchange.ID // UUID - exchangeType := config.Exchange.ExchangeType // "binance", "bybit" etc - - // 1. Sync current open positions from exchange - m.syncExternalPositions(traderID, exchangeID, exchangeType, trader) - - // 2. Sync closed positions history from exchange - m.syncClosedPositionsHistory(traderID, exchangeID, exchangeType, trader) - } - - logger.Info("📊 Startup sync completed") -} - -// syncExternalPositions syncs positions that exist on exchange but not locally -// These could be positions opened manually or from other systems -func (m *PositionSyncManager) syncExternalPositions(traderID, exchangeID, exchangeType string, trader Trader) { - // Get current positions from exchange - exchangePositions, err := trader.GetPositions() - if err != nil { - logger.Infof("⚠️ Failed to get exchange positions for external sync (ID: %s): %v", traderID, err) - return - } - - // Get local open positions - localPositions, err := m.store.Position().GetOpenPositions(traderID) - if err != nil { - logger.Infof("⚠️ Failed to get local positions for external sync (ID: %s): %v", traderID, err) - return - } - - // Build local position map: symbol_side -> position - localMap := make(map[string]*store.TraderPosition) - for _, pos := range localPositions { - key := fmt.Sprintf("%s_%s", pos.Symbol, pos.Side) - localMap[key] = pos - } - - // Find positions that exist on exchange but not locally - for _, pos := range exchangePositions { - symbol, _ := pos["symbol"].(string) - side, _ := pos["side"].(string) - if symbol == "" || side == "" { - continue - } - - // Normalize side - normalizedSide := side - if side == "Buy" || side == "LONG" || side == "long" { - normalizedSide = "LONG" - } else if side == "Sell" || side == "SHORT" || side == "short" { - normalizedSide = "SHORT" - } - - key := fmt.Sprintf("%s_%s", symbol, normalizedSide) - - // Check if we already have this position locally - if _, exists := localMap[key]; exists { - continue // Already tracking this position - } - - // This is an external position - create local record - qty := getFloatFromMap(pos, "positionAmt") - if qty < 0 { - qty = -qty - } - if qty < 0.0000001 { - continue // No actual position - } - - entryPrice := getFloatFromMap(pos, "entryPrice") - leverage := int(getFloatFromMap(pos, "leverage")) - if leverage == 0 { - leverage = 1 - } - - // Get entry time if available - createdTime := getFloatFromMap(pos, "createdTime") - var entryTime time.Time - if createdTime > 0 { - entryTime = time.UnixMilli(int64(createdTime)) - } else { - entryTime = time.Now() // Use current time as fallback - } - - // Generate unique exchange position ID - exchangePositionID := fmt.Sprintf("%s_%s_%d", symbol, normalizedSide, entryTime.UnixMilli()) - - newPos := &store.TraderPosition{ - TraderID: traderID, - ExchangeID: exchangeID, - ExchangeType: exchangeType, - ExchangePositionID: exchangePositionID, - Symbol: symbol, - Side: normalizedSide, - Quantity: qty, - EntryPrice: entryPrice, - EntryTime: entryTime, - Leverage: leverage, - Source: "sync", // Mark as synced from exchange - } - - if err := m.store.Position().CreateOpenPosition(newPos); err != nil { - logger.Infof("⚠️ Failed to create external position record: %v", err) - } else { - logger.Infof("📊 Synced external position: [%s] %s %s @ %.4f (qty: %.4f)", - traderID[:8], symbol, normalizedSide, entryPrice, qty) - } - } -} - -// syncClosedPositionsHistory syncs closed positions from exchange history -// IMPORTANT: Only exchanges with position-level history API should sync history: -// - Bybit: /v5/position/closed-pnl (accurate position records) -// - OKX: /api/v5/account/positions-history (accurate position records) -// Other exchanges (Binance, Hyperliquid, Lighter, Aster) only have trade-level data, -// which cannot accurately reconstruct positions. They should NOT sync historical positions. -func (m *PositionSyncManager) syncClosedPositionsHistory(traderID, exchangeID, exchangeType string, trader Trader) { - // Only sync history for exchanges with position-level API - // Binance/Hyperliquid/Lighter/Aster only have trade-level data, skip history sync - switch exchangeType { - case "bybit", "okx": - // These exchanges have position-level history API, proceed with sync - default: - // Other exchanges don't have accurate position history API - // Their GetClosedPnL only returns recent trades for closure detection, not for history sync - return - } - - // Get last sync time from database - lastSyncTime, err := m.store.Position().GetLastClosedPositionTime(traderID) - if err != nil { - logger.Infof("⚠️ Failed to get last closed position time (ID: %s): %v", traderID, err) - // First sync: go back 90 days to get more history - lastSyncTime = time.Now().Add(-90 * 24 * time.Hour) - } - - // Subtract a small buffer to avoid missing positions at the boundary - startTime := lastSyncTime.Add(-1 * time.Minute) - - // Pagination loop to get all records - const batchSize = 500 - totalCreated := 0 - totalSkipped := 0 - - for { - // Get closed positions from exchange - closedRecords, err := trader.GetClosedPnL(startTime, batchSize) - if err != nil { - logger.Infof("⚠️ Failed to get closed PnL records (ID: %s): %v", traderID, err) - break - } - - if len(closedRecords) == 0 { - break - } - - // Convert to store.ClosedPnLRecord and sync - storeRecords := make([]store.ClosedPnLRecord, len(closedRecords)) - var latestExitTime time.Time - for i, rec := range closedRecords { - storeRecords[i] = store.ClosedPnLRecord{ - Symbol: rec.Symbol, - Side: rec.Side, - EntryPrice: rec.EntryPrice, - ExitPrice: rec.ExitPrice, - Quantity: rec.Quantity, - RealizedPnL: rec.RealizedPnL, - Fee: rec.Fee, - Leverage: rec.Leverage, - EntryTime: rec.EntryTime, - ExitTime: rec.ExitTime, - OrderID: rec.OrderID, - CloseType: rec.CloseType, - ExchangeID: rec.ExchangeID, - } - // Track latest exit time for pagination - if rec.ExitTime.After(latestExitTime) { - latestExitTime = rec.ExitTime - } - } - - created, skipped, err := m.store.Position().SyncClosedPositions(traderID, exchangeID, exchangeType, storeRecords) - if err != nil { - logger.Infof("⚠️ Failed to sync closed positions (ID: %s): %v", traderID, err) - break - } - - totalCreated += created - totalSkipped += skipped - - // If we got fewer records than batch size, we've reached the end - if len(closedRecords) < batchSize { - break - } - - // Move start time forward for next batch (add 1ms to avoid duplicate) - startTime = latestExitTime.Add(time.Millisecond) - } - - if totalCreated > 0 { - logger.Infof("📊 Synced %d new closed positions for trader %s (skipped %d duplicates)", - totalCreated, traderID[:8], totalSkipped) - } - - // Update last history sync time - m.lastHistorySyncMutex.Lock() - m.lastHistorySync[traderID] = time.Now() - m.lastHistorySyncMutex.Unlock() -} - -// maybeRunHistorySync checks if it's time to run history sync for a trader -func (m *PositionSyncManager) maybeRunHistorySync(traderID, exchangeID, exchangeType string, trader Trader) { - m.lastHistorySyncMutex.RLock() - lastSync, exists := m.lastHistorySync[traderID] - m.lastHistorySyncMutex.RUnlock() - - if !exists || time.Since(lastSync) >= m.historySyncInterval { - m.syncClosedPositionsHistory(traderID, exchangeID, exchangeType, trader) - } -} diff --git a/web/src/components/AdvancedChart.tsx b/web/src/components/AdvancedChart.tsx index ac66ee25..f8ee8fcf 100644 --- a/web/src/components/AdvancedChart.tsx +++ b/web/src/components/AdvancedChart.tsx @@ -83,6 +83,8 @@ export function AdvancedChart({ const [showIndicatorPanel, setShowIndicatorPanel] = useState(false) const [showOrderMarkers, setShowOrderMarkers] = useState(true) // 订单标记显示开关,默认显示 const isInitialLoadRef = useRef(true) // 跟踪是否为初始加载 + const [tooltipData, setTooltipData] = useState(null) + const tooltipRef = useRef(null) // 指标配置 const [indicators, setIndicators] = useState([ @@ -355,6 +357,31 @@ export function AdvancedChart({ window.addEventListener('resize', handleResize) + // 监听鼠标移动,显示 OHLC 信息 + chart.subscribeCrosshairMove((param) => { + if (!param.time || !param.point || !candlestickSeriesRef.current) { + setTooltipData(null) + return + } + + const data = param.seriesData.get(candlestickSeriesRef.current as any) + if (!data) { + setTooltipData(null) + return + } + + const candleData = data as any + setTooltipData({ + time: param.time, + open: candleData.open, + high: candleData.high, + low: candleData.low, + close: candleData.close, + x: param.point.x, + y: param.point.y, + }) + }) + return () => { window.removeEventListener('resize', handleResize) chart.remove() @@ -741,6 +768,56 @@ export function AdvancedChart({
+ {/* OHLC Tooltip */} + {tooltipData && ( +
+
+ {new Date((tooltipData.time as number) * 1000).toLocaleString(language === 'zh' ? 'zh-CN' : 'en-US', { + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + })} +
+
+ O: + {tooltipData.open?.toFixed(2)} + + H: + {tooltipData.high?.toFixed(2)} + + L: + {tooltipData.low?.toFixed(2)} + + C: + = tooltipData.open ? '#0ECB81' : '#F6465D', + fontWeight: 'bold' + }}> + {tooltipData.close?.toFixed(2)} + +
+
+ )} + {/* NOFX 水印 */}
(null) // Markers primitive for v5 const [loading, setLoading] = useState(true) const [error, setError] = useState(null) + const [tooltipData, setTooltipData] = useState(null) + const tooltipRef = useRef(null) // 解析时间:支持 Unix 时间戳(数字)或字符串格式 const parseCustomTime = (time: any): number => { @@ -243,6 +245,31 @@ export function ChartWithOrders({ window.addEventListener('resize', handleResize) + // 监听鼠标移动,显示 OHLC 信息 + chart.subscribeCrosshairMove((param) => { + if (!param.time || !param.point || !candlestickSeriesRef.current) { + setTooltipData(null) + return + } + + const data = param.seriesData.get(candlestickSeriesRef.current as any) + if (!data) { + setTooltipData(null) + return + } + + const candleData = data as any + setTooltipData({ + time: param.time, + open: candleData.open, + high: candleData.high, + low: candleData.low, + close: candleData.close, + x: param.point.x, + y: param.point.y, + }) + }) + return () => { window.removeEventListener('resize', handleResize) chart.remove() @@ -370,7 +397,59 @@ export function ChartWithOrders({
{/* 图表容器 */} -
+
+
+ + {/* OHLC Tooltip */} + {tooltipData && ( +
+
+ {new Date((tooltipData.time as number) * 1000).toLocaleString(language === 'zh' ? 'zh-CN' : 'en-US', { + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + })} +
+
+ O: + {tooltipData.open?.toFixed(2)} + + H: + {tooltipData.high?.toFixed(2)} + + L: + {tooltipData.low?.toFixed(2)} + + C: + = tooltipData.open ? '#0ECB81' : '#F6465D', + fontWeight: 'bold' + }}> + {tooltipData.close?.toFixed(2)} + +
+
+ )} +
{/* 错误提示 */} {error && (