From c5abcf1f2cfe95cfe71500f63e0cf893067d7915 Mon Sep 17 00:00:00 2001 From: 0xYYBB | ZYY | Bobo <128128010+the-dev-z@users.noreply.github.com> Date: Wed, 12 Nov 2025 10:41:26 +0800 Subject: [PATCH] feat(market): add data staleness detection (Part 2/3) (#800) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(market): add data staleness detection ## 問題背景 解決 PR #703 Part 2: 數據陳舊性檢測 - 修復 DOGEUSDT 式問題:連續價格不變表示數據源異常 - 防止系統處理僵化/過期的市場數據 ## 技術方案 ### 數據陳舊性檢測 (market/data.go) - **函數**: `isStaleData(klines []Kline, symbol string) bool` - **檢測邏輯**: - 連續 5 個 3 分鐘週期價格完全不變(15 分鐘無波動) - 價格波動容忍度:0.01%(避免誤報) - 成交量檢查:價格凍結 + 成交量為 0 → 確認陳舊 - **處理策略**: - 數據陳舊確認:跳過該幣種,返回錯誤 - 極低波動市場:記錄警告但允許通過(價格穩定但有成交量) ### 調用時機 - 在 `Get()` 函數中,獲取 3m K線後立即檢測 - 早期返回:避免後續無意義的計算和 API 調用 ## 實現細節 - **檢測閾值**: 5 個連續週期 - **容忍度**: 0.01% 價格波動 - **日誌**: 英文國際化版本 - **並發安全**: 函數無狀態,安全 ## 影響範圍 - ✅ 修改 market/data.go: 新增 isStaleData() + 調用邏輯 - ✅ 新增 log 包導入 - ✅ 50 行新增代碼 ## 測試建議 1. 模擬 DOGEUSDT 場景:連續價格不變 + 成交量為 0 2. 驗證日誌輸出:`stale data confirmed: price freeze + zero volume` 3. 正常市場:極低波動但有成交量,應允許通過並記錄警告 ## 相關 Issue/PR - 拆分自 **PR #703** (Part 2/3) - 基於最新 upstream/dev (3112250) - 依賴: 無 - 前置: Part 1 (OI 時間序列) - 已提交 PR #798 - 後續: Part 3 (手續費率傳遞) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: tinkle-community * test(market): add comprehensive unit tests for isStaleData function - Test normal fluctuating data (expects non-stale) - Test price freeze with zero volume (expects stale) - Test price freeze with volume (low volatility market) - Test insufficient data edge case (<5 klines) - Test boundary conditions (exactly 5 klines) - Test tolerance threshold (0.01% price change) - Test mixed scenario (normal → freeze transition) - Test empty klines edge case All 8 test cases passed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: tinkle-community --------- Co-authored-by: ZhouYongyou <128128010+zhouyongyou@users.noreply.github.com> Co-authored-by: tinkle-community Co-authored-by: Shui <88711385+hzb1115@users.noreply.github.com> --- market/data.go | 51 +++++++++++++ market/data_test.go | 177 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 216 insertions(+), 12 deletions(-) diff --git a/market/data.go b/market/data.go index f3f1c586..3ea1a248 100644 --- a/market/data.go +++ b/market/data.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "math" "strconv" "strings" @@ -35,6 +36,12 @@ func Get(symbol string) (*Data, error) { return nil, fmt.Errorf("获取3分钟K线失败: %v", err) } + // Data staleness detection: Prevent DOGEUSDT-style price freeze issues + if isStaleData(klines3m, symbol) { + log.Printf("⚠️ WARNING: %s detected stale data (consecutive price freeze), skipping symbol", symbol) + return nil, fmt.Errorf("%s data is stale, possible cache failure", symbol) + } + // 获取4小时K线数据 (最近10个) klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // 多获取用于计算指标 if err != nil { @@ -541,3 +548,47 @@ func parseFloat(v interface{}) (float64, error) { return 0, fmt.Errorf("unsupported type: %T", v) } } + +// isStaleData detects stale data (consecutive price freeze) +// Fix DOGEUSDT-style issue: consecutive N periods with completely unchanged prices indicate data source anomaly +func isStaleData(klines []Kline, symbol string) bool { + if len(klines) < 5 { + return false // Insufficient data to determine + } + + // Detection threshold: 5 consecutive 3-minute periods with unchanged price (15 minutes without fluctuation) + const stalePriceThreshold = 5 + const priceTolerancePct = 0.0001 // 0.01% fluctuation tolerance (avoid false positives) + + // Take the last stalePriceThreshold K-lines + recentKlines := klines[len(klines)-stalePriceThreshold:] + firstPrice := recentKlines[0].Close + + // Check if all prices are within tolerance + for i := 1; i < len(recentKlines); i++ { + priceDiff := math.Abs(recentKlines[i].Close-firstPrice) / firstPrice + if priceDiff > priceTolerancePct { + return false // Price fluctuation exists, data is normal + } + } + + // Additional check: MACD and volume + // If price is unchanged but MACD/volume shows normal fluctuation, it might be a real market situation (extremely low volatility) + // Check if volume is also 0 (data completely frozen) + allVolumeZero := true + for _, k := range recentKlines { + if k.Volume > 0 { + allVolumeZero = false + break + } + } + + if allVolumeZero { + log.Printf("⚠️ %s stale data confirmed: price freeze + zero volume", symbol) + return true + } + + // Price frozen but has volume: might be extremely low volatility market, allow but log warning + log.Printf("⚠️ %s detected extreme price stability (no fluctuation for %d consecutive periods), but volume is normal", symbol, stalePriceThreshold) + return false +} diff --git a/market/data_test.go b/market/data_test.go index b0b34f0f..984e727d 100644 --- a/market/data_test.go +++ b/market/data_test.go @@ -131,19 +131,19 @@ func TestCalculateIntradaySeries_VolumeValues(t *testing.T) { // TestCalculateIntradaySeries_ATR14 测试 ATR14 计算 func TestCalculateIntradaySeries_ATR14(t *testing.T) { tests := []struct { - name string - klineCount int - expectZero bool + name string + klineCount int + expectZero bool expectNonZero bool }{ { - name: "足够数据 - 20个K线", - klineCount: 20, + name: "足够数据 - 20个K线", + klineCount: 20, expectNonZero: true, }, { - name: "刚好15个K线(ATR14需要至少15个)", - klineCount: 15, + name: "刚好15个K线(ATR14需要至少15个)", + klineCount: 15, expectNonZero: true, }, { @@ -253,11 +253,11 @@ func TestCalculateATR(t *testing.T) { func TestCalculateATR_TrueRange(t *testing.T) { // 创建一个简单的测试用例,手动计算期望的 ATR klines := []Kline{ - {High: 50.0, Low: 48.0, Close: 49.0}, // TR = 2.0 - {High: 51.0, Low: 49.0, Close: 50.0}, // TR = max(2.0, 2.0, 1.0) = 2.0 - {High: 52.0, Low: 50.0, Close: 51.0}, // TR = max(2.0, 2.0, 1.0) = 2.0 - {High: 53.0, Low: 51.0, Close: 52.0}, // TR = 2.0 - {High: 54.0, Low: 52.0, Close: 53.0}, // TR = 2.0 + {High: 50.0, Low: 48.0, Close: 49.0}, // TR = 2.0 + {High: 51.0, Low: 49.0, Close: 50.0}, // TR = max(2.0, 2.0, 1.0) = 2.0 + {High: 52.0, Low: 50.0, Close: 51.0}, // TR = max(2.0, 2.0, 1.0) = 2.0 + {High: 53.0, Low: 51.0, Close: 52.0}, // TR = 2.0 + {High: 54.0, Low: 52.0, Close: 53.0}, // TR = 2.0 } atr := calculateATR(klines, 3) @@ -347,3 +347,156 @@ func TestCalculateIntradaySeries_VolumePrecision(t *testing.T) { } } } + +// TestIsStaleData_NormalData tests that normal fluctuating data returns false +func TestIsStaleData_NormalData(t *testing.T) { + klines := []Kline{ + {Close: 100.0, Volume: 1000}, + {Close: 100.5, Volume: 1200}, + {Close: 99.8, Volume: 900}, + {Close: 100.2, Volume: 1100}, + {Close: 100.1, Volume: 950}, + } + + result := isStaleData(klines, "BTCUSDT") + + if result { + t.Error("Expected false for normal fluctuating data, got true") + } +} + +// TestIsStaleData_PriceFreezeWithZeroVolume tests that frozen price + zero volume returns true +func TestIsStaleData_PriceFreezeWithZeroVolume(t *testing.T) { + klines := []Kline{ + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + } + + result := isStaleData(klines, "DOGEUSDT") + + if !result { + t.Error("Expected true for frozen price + zero volume, got false") + } +} + +// TestIsStaleData_PriceFreezeWithVolume tests that frozen price but normal volume returns false +func TestIsStaleData_PriceFreezeWithVolume(t *testing.T) { + klines := []Kline{ + {Close: 100.0, Volume: 1000}, + {Close: 100.0, Volume: 1200}, + {Close: 100.0, Volume: 900}, + {Close: 100.0, Volume: 1100}, + {Close: 100.0, Volume: 950}, + } + + result := isStaleData(klines, "STABLECOIN") + + if result { + t.Error("Expected false for frozen price but normal volume (low volatility market), got true") + } +} + +// TestIsStaleData_InsufficientData tests that insufficient data (<5 klines) returns false +func TestIsStaleData_InsufficientData(t *testing.T) { + klines := []Kline{ + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + } + + result := isStaleData(klines, "BTCUSDT") + + if result { + t.Error("Expected false for insufficient data (<5 klines), got true") + } +} + +// TestIsStaleData_ExactlyFiveKlines tests edge case with exactly 5 klines +func TestIsStaleData_ExactlyFiveKlines(t *testing.T) { + // Stale case: exactly 5 frozen klines with zero volume + staleKlines := []Kline{ + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + {Close: 100.0, Volume: 0}, + } + + result := isStaleData(staleKlines, "TESTUSDT") + if !result { + t.Error("Expected true for exactly 5 frozen klines with zero volume, got false") + } + + // Normal case: exactly 5 klines with fluctuation + normalKlines := []Kline{ + {Close: 100.0, Volume: 1000}, + {Close: 100.1, Volume: 1100}, + {Close: 99.9, Volume: 900}, + {Close: 100.0, Volume: 1000}, + {Close: 100.05, Volume: 950}, + } + + result = isStaleData(normalKlines, "TESTUSDT") + if result { + t.Error("Expected false for exactly 5 normal klines, got true") + } +} + +// TestIsStaleData_WithinTolerance tests price changes within tolerance (0.01%) +func TestIsStaleData_WithinTolerance(t *testing.T) { + // Price changes within 0.01% tolerance should be treated as frozen + basePrice := 10000.0 + tolerance := 0.0001 // 0.01% + smallChange := basePrice * tolerance * 0.5 // Half of tolerance + + klines := []Kline{ + {Close: basePrice, Volume: 1000}, + {Close: basePrice + smallChange, Volume: 1000}, + {Close: basePrice - smallChange, Volume: 1000}, + {Close: basePrice, Volume: 1000}, + {Close: basePrice + smallChange, Volume: 1000}, + } + + result := isStaleData(klines, "BTCUSDT") + + // Should return false because there's normal volume despite tiny price changes + if result { + t.Error("Expected false for price within tolerance but with volume, got true") + } +} + +// TestIsStaleData_MixedScenario tests realistic scenario with some history before freeze +func TestIsStaleData_MixedScenario(t *testing.T) { + // Simulate: normal trading → suddenly freezes + klines := []Kline{ + {Close: 100.0, Volume: 1000}, // Normal + {Close: 100.5, Volume: 1200}, // Normal + {Close: 100.2, Volume: 1100}, // Normal + {Close: 50.0, Volume: 0}, // Freeze starts + {Close: 50.0, Volume: 0}, // Frozen + {Close: 50.0, Volume: 0}, // Frozen + {Close: 50.0, Volume: 0}, // Frozen + {Close: 50.0, Volume: 0}, // Frozen (last 5 are all frozen) + } + + result := isStaleData(klines, "DOGEUSDT") + + // Should detect stale data based on last 5 klines + if !result { + t.Error("Expected true for frozen last 5 klines with zero volume, got false") + } +} + +// TestIsStaleData_EmptyKlines tests edge case with empty slice +func TestIsStaleData_EmptyKlines(t *testing.T) { + klines := []Kline{} + + result := isStaleData(klines, "BTCUSDT") + + if result { + t.Error("Expected false for empty klines, got true") + } +}