Files
nofx/trader/lighter/order_sync.go
tinkle-community 953240565f fix(trader): stop order-sync goroutine leak and rate-limit hammering
Every StartOrderSync spawned a ticker goroutine that ran forever — it
survived trader stop AND deletion, so each quick-created trader left a
permanent 30s Hyperliquid poll behind. Stacked leaks turned into an
~8s effective hammer that tripped Hyperliquid's 429 rate limit, which
then broke the symbol board, trader creation, and order sync itself.

- new trader/syncloop package: shared stoppable sync loop with
  exponential failure backoff (30s base, 5min cap)
- all 9 exchanges' StartOrderSync now take the trader's stop channel
  and stop when the trader stops (close broadcast from AutoTrader.Stop)
- provider/hyperliquid: GetPerpDexCoins now serves a 5min TTL cache and
  falls back to the stale board when the upstream returns 429, so the
  symbol panel keeps working through rate limiting
2026-06-11 21:45:31 +08:00

160 lines
5.3 KiB
Go

package lighter
import (
"fmt"
"nofx/logger"
"nofx/market"
"nofx/store"
"nofx/trader/syncloop"
"sort"
"strings"
"time"
)
// SyncOrdersFromLighter syncs Lighter exchange trade history to local database
// Also creates/updates position records to ensure orders/fills/positions data consistency
// exchangeID: Exchange account UUID (from exchanges.id)
// exchangeType: Exchange type ("lighter")
func (t *LighterTraderV2) SyncOrdersFromLighter(traderID string, exchangeID string, exchangeType string, st *store.Store) error {
if st == nil {
return fmt.Errorf("store is nil")
}
// Get recent trades (last 24 hours)
startTime := time.Now().Add(-24 * time.Hour)
logger.Infof("🔄 Syncing Lighter trades from: %s", startTime.Format(time.RFC3339))
// Use GetTrades method to fetch trade records (same as other exchanges)
trades, err := t.GetTrades(startTime, 100)
if err != nil {
return fmt.Errorf("failed to get trades: %w", err)
}
logger.Infof("📥 Received %d trades from Lighter", len(trades))
// Sort trades by time ASC (oldest first) for proper position building
sort.Slice(trades, func(i, j int) bool {
return trades[i].Time.UnixMilli() < trades[j].Time.UnixMilli()
})
// Process trades one by one (no transaction to avoid deadlock)
orderStore := st.Order()
positionStore := st.Position()
posBuilder := store.NewPositionBuilder(positionStore)
syncedCount := 0
for _, trade := range trades {
// Check if trade already exists (use exchangeID which is UUID, not exchange type)
existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID)
if err == nil && existing != nil {
continue // Trade already exists, skip
}
// Normalize symbol (add USDT suffix)
symbol := market.Normalize(trade.Symbol)
// Use OrderAction from TradeRecord (determined by position change in GetTrades)
// This is more accurate than guessing based on database state
positionSide := trade.PositionSide
orderAction := trade.OrderAction
side := trade.Side
// Fallback if OrderAction is empty (shouldn't happen with updated GetTrades)
if orderAction == "" {
if strings.ToUpper(side) == "BUY" {
positionSide = "LONG"
orderAction = "open_long"
} else {
positionSide = "SHORT"
orderAction = "open_short"
}
}
// Create order record - use Unix milliseconds UTC
tradeTimeMs := trade.Time.UTC().UnixMilli()
orderRecord := &store.TraderOrder{
TraderID: traderID,
ExchangeID: exchangeID, // UUID
ExchangeType: exchangeType, // Exchange type
ExchangeOrderID: trade.TradeID,
Symbol: symbol,
Side: strings.ToUpper(side),
PositionSide: positionSide,
Type: "MARKET",
OrderAction: orderAction,
Quantity: trade.Quantity,
Price: trade.Price,
Status: "FILLED",
FilledQuantity: trade.Quantity,
AvgFillPrice: trade.Price,
Commission: trade.Fee,
FilledAt: tradeTimeMs,
CreatedAt: tradeTimeMs,
UpdatedAt: tradeTimeMs,
}
// Insert order record
if err := orderStore.CreateOrder(orderRecord); err != nil {
logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err)
continue
}
// Create fill record - use Unix milliseconds UTC
fillRecord := &store.TraderFill{
TraderID: traderID,
ExchangeID: exchangeID, // UUID
ExchangeType: exchangeType, // Exchange type
OrderID: orderRecord.ID,
ExchangeOrderID: trade.TradeID,
ExchangeTradeID: trade.TradeID,
Symbol: symbol,
Side: strings.ToUpper(side),
Price: trade.Price,
Quantity: trade.Quantity,
QuoteQuantity: trade.Price * trade.Quantity,
Commission: trade.Fee,
CommissionAsset: "USDT",
RealizedPnL: trade.RealizedPnL,
IsMaker: false,
CreatedAt: tradeTimeMs,
}
if err := orderStore.CreateFill(fillRecord); err != nil {
logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err)
}
// Create/update position record using PositionBuilder
if err := posBuilder.ProcessTrade(
traderID, exchangeID, exchangeType,
symbol, positionSide, orderAction,
trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL,
tradeTimeMs, trade.TradeID,
); err != nil {
logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err)
} else {
logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, orderAction, trade.Quantity)
}
syncedCount++
logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s",
trade.TradeID, symbol, side, trade.Quantity, trade.Price, trade.RealizedPnL, trade.Fee, orderAction)
}
logger.Infof("✅ Order sync completed: %d new trades synced", syncedCount)
return nil
}
// StartOrderSync starts background order sync task
func (t *LighterTraderV2) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration, stop <-chan struct{}) {
syncloop.Run(stop, interval, "Lighter", func() error {
err := t.SyncOrdersFromLighter(traderID, exchangeID, exchangeType, st)
// A 404 just means the account has no fills yet — treat as a
// successful empty sync to avoid log spam and pointless backoff.
if err != nil && strings.Contains(err.Error(), "status 404") {
return nil
}
return err
})
}