mirror of
https://github.com/NoFxAiOS/nofx.git
synced 2026-07-03 19:11:02 +08:00
Every StartOrderSync spawned a ticker goroutine that ran forever — it survived trader stop AND deletion, so each quick-created trader left a permanent 30s Hyperliquid poll behind. Stacked leaks turned into an ~8s effective hammer that tripped Hyperliquid's 429 rate limit, which then broke the symbol board, trader creation, and order sync itself. - new trader/syncloop package: shared stoppable sync loop with exponential failure backoff (30s base, 5min cap) - all 9 exchanges' StartOrderSync now take the trader's stop channel and stop when the trader stops (close broadcast from AutoTrader.Stop) - provider/hyperliquid: GetPerpDexCoins now serves a 5min TTL cache and falls back to the stale board when the upstream returns 429, so the symbol panel keeps working through rate limiting
307 lines
9.6 KiB
Go
307 lines
9.6 KiB
Go
package bybit
|
|
|
|
import (
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"nofx/logger"
|
|
"nofx/market"
|
|
"nofx/store"
|
|
"nofx/trader/syncloop"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// BybitTrade represents a trade record from Bybit execution list
|
|
type BybitTrade struct {
|
|
Symbol string
|
|
OrderID string
|
|
ExecID string
|
|
Side string // Buy or Sell
|
|
ExecPrice float64
|
|
ExecQty float64
|
|
ExecFee float64
|
|
ExecTime time.Time
|
|
IsMaker bool
|
|
OrderType string
|
|
ClosedSize float64 // For close orders
|
|
ClosedPnL float64
|
|
OrderAction string // open_long, open_short, close_long, close_short
|
|
}
|
|
|
|
// GetTrades retrieves trade/execution records from Bybit
|
|
func (t *BybitTrader) GetTrades(startTime time.Time, limit int) ([]BybitTrade, error) {
|
|
return t.getTradesViaHTTP(startTime, limit)
|
|
}
|
|
|
|
// getTradesViaHTTP makes direct HTTP call to Bybit API for execution list
|
|
func (t *BybitTrader) getTradesViaHTTP(startTime time.Time, limit int) ([]BybitTrade, error) {
|
|
// Build query string
|
|
queryParams := fmt.Sprintf("category=linear&startTime=%d&limit=%d", startTime.UnixMilli(), limit)
|
|
url := "https://api.bybit.com/v5/execution/list?" + queryParams
|
|
|
|
// Generate timestamp
|
|
timestamp := fmt.Sprintf("%d", time.Now().UnixMilli())
|
|
recvWindow := "5000"
|
|
|
|
// Build signature payload: timestamp + api_key + recv_window + queryString
|
|
signPayload := timestamp + t.apiKey + recvWindow + queryParams
|
|
|
|
// Generate HMAC-SHA256 signature
|
|
h := hmac.New(sha256.New, []byte(t.secretKey))
|
|
h.Write([]byte(signPayload))
|
|
signature := hex.EncodeToString(h.Sum(nil))
|
|
|
|
// Create request
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
// Add Bybit V5 API headers
|
|
req.Header.Set("X-BAPI-API-KEY", t.apiKey)
|
|
req.Header.Set("X-BAPI-SIGN", signature)
|
|
req.Header.Set("X-BAPI-SIGN-TYPE", "2")
|
|
req.Header.Set("X-BAPI-TIMESTAMP", timestamp)
|
|
req.Header.Set("X-BAPI-RECV-WINDOW", recvWindow)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
// Use http.DefaultClient for the request
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to call Bybit API: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read response: %w", err)
|
|
}
|
|
|
|
var result struct {
|
|
RetCode int `json:"retCode"`
|
|
RetMsg string `json:"retMsg"`
|
|
Result struct {
|
|
List []map[string]interface{} `json:"list"`
|
|
} `json:"result"`
|
|
}
|
|
|
|
if err := json.Unmarshal(body, &result); err != nil {
|
|
return nil, fmt.Errorf("failed to parse response: %w", err)
|
|
}
|
|
|
|
if result.RetCode != 0 {
|
|
return nil, fmt.Errorf("Bybit API error: %s", result.RetMsg)
|
|
}
|
|
|
|
return t.parseTradesResult(result.Result.List)
|
|
}
|
|
|
|
// parseTradesResult parses the execution list result from Bybit API
|
|
func (t *BybitTrader) parseTradesResult(list []map[string]interface{}) ([]BybitTrade, error) {
|
|
var trades []BybitTrade
|
|
|
|
for _, item := range list {
|
|
symbol, _ := item["symbol"].(string)
|
|
orderID, _ := item["orderId"].(string)
|
|
execID, _ := item["execId"].(string)
|
|
side, _ := item["side"].(string)
|
|
orderType, _ := item["orderType"].(string)
|
|
isMaker, _ := item["isMaker"].(bool)
|
|
|
|
execPriceStr, _ := item["execPrice"].(string)
|
|
execQtyStr, _ := item["execQty"].(string)
|
|
execFeeStr, _ := item["execFee"].(string)
|
|
closedSizeStr, _ := item["closedSize"].(string)
|
|
closedPnlStr, _ := item["closedPnl"].(string)
|
|
execTimeStr, _ := item["execTime"].(string)
|
|
|
|
execPrice, _ := strconv.ParseFloat(execPriceStr, 64)
|
|
execQty, _ := strconv.ParseFloat(execQtyStr, 64)
|
|
execFee, _ := strconv.ParseFloat(execFeeStr, 64)
|
|
closedSize, _ := strconv.ParseFloat(closedSizeStr, 64)
|
|
closedPnl, _ := strconv.ParseFloat(closedPnlStr, 64)
|
|
execTimeMs, _ := strconv.ParseInt(execTimeStr, 10, 64)
|
|
execTime := time.UnixMilli(execTimeMs).UTC()
|
|
|
|
// Determine order action based on side and closedSize
|
|
// If closedSize > 0, it's a close trade
|
|
// Side: Buy = long direction, Sell = short direction
|
|
orderAction := "open_long"
|
|
if closedSize > 0 {
|
|
// This is a close trade
|
|
if strings.ToLower(side) == "sell" {
|
|
orderAction = "close_long" // Selling to close a long
|
|
} else {
|
|
orderAction = "close_short" // Buying to close a short
|
|
}
|
|
} else {
|
|
// This is an open trade
|
|
if strings.ToLower(side) == "buy" {
|
|
orderAction = "open_long"
|
|
} else {
|
|
orderAction = "open_short"
|
|
}
|
|
}
|
|
|
|
trade := BybitTrade{
|
|
Symbol: symbol,
|
|
OrderID: orderID,
|
|
ExecID: execID,
|
|
Side: side,
|
|
ExecPrice: execPrice,
|
|
ExecQty: execQty,
|
|
ExecFee: execFee,
|
|
ExecTime: execTime,
|
|
IsMaker: isMaker,
|
|
OrderType: orderType,
|
|
ClosedSize: closedSize,
|
|
ClosedPnL: closedPnl,
|
|
OrderAction: orderAction,
|
|
}
|
|
|
|
trades = append(trades, trade)
|
|
}
|
|
|
|
return trades, nil
|
|
}
|
|
|
|
// SyncOrdersFromBybit syncs Bybit exchange order history to local database
|
|
// Also creates/updates position records to ensure orders/fills/positions data consistency
|
|
// exchangeID: Exchange account UUID (from exchanges.id)
|
|
// exchangeType: Exchange type ("bybit")
|
|
func (t *BybitTrader) SyncOrdersFromBybit(traderID string, exchangeID string, exchangeType string, st *store.Store) error {
|
|
if st == nil {
|
|
return fmt.Errorf("store is nil")
|
|
}
|
|
|
|
// Get recent trades (last 24 hours)
|
|
startTime := time.Now().Add(-24 * time.Hour)
|
|
|
|
logger.Infof("🔄 Syncing Bybit trades from: %s", startTime.Format(time.RFC3339))
|
|
|
|
// Use GetTrades method to fetch trade records
|
|
trades, err := t.GetTrades(startTime, 1000)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get trades: %w", err)
|
|
}
|
|
|
|
logger.Infof("📥 Received %d trades from Bybit", len(trades))
|
|
|
|
// Sort trades by time ASC (oldest first) for proper position building
|
|
sort.Slice(trades, func(i, j int) bool {
|
|
return trades[i].ExecTime.UnixMilli() < trades[j].ExecTime.UnixMilli()
|
|
})
|
|
|
|
// Process trades one by one (no transaction to avoid deadlock)
|
|
orderStore := st.Order()
|
|
positionStore := st.Position()
|
|
posBuilder := store.NewPositionBuilder(positionStore)
|
|
syncedCount := 0
|
|
|
|
for _, trade := range trades {
|
|
// Check if trade already exists (use exchangeID which is UUID, not exchange type)
|
|
existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.ExecID)
|
|
if err == nil && existing != nil {
|
|
continue // Order already exists, skip
|
|
}
|
|
|
|
// Normalize symbol
|
|
symbol := market.Normalize(trade.Symbol)
|
|
|
|
// Determine position side from order action
|
|
positionSide := "LONG"
|
|
if strings.Contains(trade.OrderAction, "short") {
|
|
positionSide = "SHORT"
|
|
}
|
|
|
|
// Normalize side for storage
|
|
side := strings.ToUpper(trade.Side)
|
|
|
|
// Create order record - use UTC time in milliseconds to avoid timezone issues
|
|
execTimeMs := trade.ExecTime.UTC().UnixMilli()
|
|
orderRecord := &store.TraderOrder{
|
|
TraderID: traderID,
|
|
ExchangeID: exchangeID, // UUID
|
|
ExchangeType: exchangeType, // Exchange type
|
|
ExchangeOrderID: trade.ExecID, // Use ExecID as unique identifier
|
|
Symbol: symbol,
|
|
Side: side,
|
|
PositionSide: "BOTH", // Bybit uses one-way position mode
|
|
Type: trade.OrderType,
|
|
OrderAction: trade.OrderAction,
|
|
Quantity: trade.ExecQty,
|
|
Price: trade.ExecPrice,
|
|
Status: "FILLED",
|
|
FilledQuantity: trade.ExecQty,
|
|
AvgFillPrice: trade.ExecPrice,
|
|
Commission: trade.ExecFee,
|
|
FilledAt: execTimeMs,
|
|
CreatedAt: execTimeMs,
|
|
UpdatedAt: execTimeMs,
|
|
}
|
|
|
|
// Insert order record
|
|
if err := orderStore.CreateOrder(orderRecord); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.ExecID, err)
|
|
continue
|
|
}
|
|
|
|
// Create fill record - use UTC time
|
|
fillRecord := &store.TraderFill{
|
|
TraderID: traderID,
|
|
ExchangeID: exchangeID, // UUID
|
|
ExchangeType: exchangeType, // Exchange type
|
|
OrderID: orderRecord.ID,
|
|
ExchangeOrderID: trade.OrderID,
|
|
ExchangeTradeID: trade.ExecID,
|
|
Symbol: symbol,
|
|
Side: side,
|
|
Price: trade.ExecPrice,
|
|
Quantity: trade.ExecQty,
|
|
QuoteQuantity: trade.ExecPrice * trade.ExecQty,
|
|
Commission: trade.ExecFee,
|
|
CommissionAsset: "USDT",
|
|
RealizedPnL: trade.ClosedPnL,
|
|
IsMaker: trade.IsMaker,
|
|
CreatedAt: execTimeMs,
|
|
}
|
|
|
|
if err := orderStore.CreateFill(fillRecord); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.ExecID, err)
|
|
}
|
|
|
|
// Create/update position record using PositionBuilder
|
|
if err := posBuilder.ProcessTrade(
|
|
traderID, exchangeID, exchangeType,
|
|
symbol, positionSide, trade.OrderAction,
|
|
trade.ExecQty, trade.ExecPrice, trade.ExecFee, trade.ClosedPnL,
|
|
execTimeMs, trade.ExecID,
|
|
); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.ExecID, err)
|
|
} else {
|
|
logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.ExecID, trade.OrderAction, trade.ExecQty)
|
|
}
|
|
|
|
syncedCount++
|
|
logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s",
|
|
trade.ExecID, symbol, side, trade.ExecQty, trade.ExecPrice, trade.ClosedPnL, trade.ExecFee, trade.OrderAction)
|
|
}
|
|
|
|
logger.Infof("✅ Bybit order sync completed: %d new trades synced", syncedCount)
|
|
return nil
|
|
}
|
|
|
|
// StartOrderSync starts background order sync task for Bybit
|
|
func (t *BybitTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration, stop <-chan struct{}) {
|
|
syncloop.Run(stop, interval, "Bybit", func() error {
|
|
return t.SyncOrdersFromBybit(traderID, exchangeID, exchangeType, st)
|
|
})
|
|
}
|