mirror of
https://github.com/NoFxAiOS/nofx.git
synced 2026-07-04 11:30:58 +08:00
feat(market): add data staleness detection (Part 2/3) (#800)
* feat(market): add data staleness detection
## 問題背景
解決 PR #703 Part 2: 數據陳舊性檢測
- 修復 DOGEUSDT 式問題:連續價格不變表示數據源異常
- 防止系統處理僵化/過期的市場數據
## 技術方案
### 數據陳舊性檢測 (market/data.go)
- **函數**: `isStaleData(klines []Kline, symbol string) bool`
- **檢測邏輯**:
- 連續 5 個 3 分鐘週期價格完全不變(15 分鐘無波動)
- 價格波動容忍度:0.01%(避免誤報)
- 成交量檢查:價格凍結 + 成交量為 0 → 確認陳舊
- **處理策略**:
- 數據陳舊確認:跳過該幣種,返回錯誤
- 極低波動市場:記錄警告但允許通過(價格穩定但有成交量)
### 調用時機
- 在 `Get()` 函數中,獲取 3m K線後立即檢測
- 早期返回:避免後續無意義的計算和 API 調用
## 實現細節
- **檢測閾值**: 5 個連續週期
- **容忍度**: 0.01% 價格波動
- **日誌**: 英文國際化版本
- **並發安全**: 函數無狀態,安全
## 影響範圍
- ✅ 修改 market/data.go: 新增 isStaleData() + 調用邏輯
- ✅ 新增 log 包導入
- ✅ 50 行新增代碼
## 測試建議
1. 模擬 DOGEUSDT 場景:連續價格不變 + 成交量為 0
2. 驗證日誌輸出:`stale data confirmed: price freeze + zero volume`
3. 正常市場:極低波動但有成交量,應允許通過並記錄警告
## 相關 Issue/PR
- 拆分自 **PR #703** (Part 2/3)
- 基於最新 upstream/dev (3112250)
- 依賴: 無
- 前置: Part 1 (OI 時間序列) - 已提交 PR #798
- 後續: Part 3 (手續費率傳遞)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: tinkle-community <tinklefund@gmail.com>
* test(market): add comprehensive unit tests for isStaleData function
- Test normal fluctuating data (expects non-stale)
- Test price freeze with zero volume (expects stale)
- Test price freeze with volume (low volatility market)
- Test insufficient data edge case (<5 klines)
- Test boundary conditions (exactly 5 klines)
- Test tolerance threshold (0.01% price change)
- Test mixed scenario (normal → freeze transition)
- Test empty klines edge case
All 8 test cases passed.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: tinkle-community <tinklefund@gmail.com>
---------
Co-authored-by: ZhouYongyou <128128010+zhouyongyou@users.noreply.github.com>
Co-authored-by: tinkle-community <tinklefund@gmail.com>
Co-authored-by: Shui <88711385+hzb1115@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
4934dd1168
commit
c5abcf1f2c
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -35,6 +36,12 @@ func Get(symbol string) (*Data, error) {
|
||||
return nil, fmt.Errorf("获取3分钟K线失败: %v", err)
|
||||
}
|
||||
|
||||
// Data staleness detection: Prevent DOGEUSDT-style price freeze issues
|
||||
if isStaleData(klines3m, symbol) {
|
||||
log.Printf("⚠️ WARNING: %s detected stale data (consecutive price freeze), skipping symbol", symbol)
|
||||
return nil, fmt.Errorf("%s data is stale, possible cache failure", symbol)
|
||||
}
|
||||
|
||||
// 获取4小时K线数据 (最近10个)
|
||||
klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // 多获取用于计算指标
|
||||
if err != nil {
|
||||
@@ -541,3 +548,47 @@ func parseFloat(v interface{}) (float64, error) {
|
||||
return 0, fmt.Errorf("unsupported type: %T", v)
|
||||
}
|
||||
}
|
||||
|
||||
// isStaleData detects stale data (consecutive price freeze)
|
||||
// Fix DOGEUSDT-style issue: consecutive N periods with completely unchanged prices indicate data source anomaly
|
||||
func isStaleData(klines []Kline, symbol string) bool {
|
||||
if len(klines) < 5 {
|
||||
return false // Insufficient data to determine
|
||||
}
|
||||
|
||||
// Detection threshold: 5 consecutive 3-minute periods with unchanged price (15 minutes without fluctuation)
|
||||
const stalePriceThreshold = 5
|
||||
const priceTolerancePct = 0.0001 // 0.01% fluctuation tolerance (avoid false positives)
|
||||
|
||||
// Take the last stalePriceThreshold K-lines
|
||||
recentKlines := klines[len(klines)-stalePriceThreshold:]
|
||||
firstPrice := recentKlines[0].Close
|
||||
|
||||
// Check if all prices are within tolerance
|
||||
for i := 1; i < len(recentKlines); i++ {
|
||||
priceDiff := math.Abs(recentKlines[i].Close-firstPrice) / firstPrice
|
||||
if priceDiff > priceTolerancePct {
|
||||
return false // Price fluctuation exists, data is normal
|
||||
}
|
||||
}
|
||||
|
||||
// Additional check: MACD and volume
|
||||
// If price is unchanged but MACD/volume shows normal fluctuation, it might be a real market situation (extremely low volatility)
|
||||
// Check if volume is also 0 (data completely frozen)
|
||||
allVolumeZero := true
|
||||
for _, k := range recentKlines {
|
||||
if k.Volume > 0 {
|
||||
allVolumeZero = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if allVolumeZero {
|
||||
log.Printf("⚠️ %s stale data confirmed: price freeze + zero volume", symbol)
|
||||
return true
|
||||
}
|
||||
|
||||
// Price frozen but has volume: might be extremely low volatility market, allow but log warning
|
||||
log.Printf("⚠️ %s detected extreme price stability (no fluctuation for %d consecutive periods), but volume is normal", symbol, stalePriceThreshold)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -131,19 +131,19 @@ func TestCalculateIntradaySeries_VolumeValues(t *testing.T) {
|
||||
// TestCalculateIntradaySeries_ATR14 测试 ATR14 计算
|
||||
func TestCalculateIntradaySeries_ATR14(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
klineCount int
|
||||
expectZero bool
|
||||
name string
|
||||
klineCount int
|
||||
expectZero bool
|
||||
expectNonZero bool
|
||||
}{
|
||||
{
|
||||
name: "足够数据 - 20个K线",
|
||||
klineCount: 20,
|
||||
name: "足够数据 - 20个K线",
|
||||
klineCount: 20,
|
||||
expectNonZero: true,
|
||||
},
|
||||
{
|
||||
name: "刚好15个K线(ATR14需要至少15个)",
|
||||
klineCount: 15,
|
||||
name: "刚好15个K线(ATR14需要至少15个)",
|
||||
klineCount: 15,
|
||||
expectNonZero: true,
|
||||
},
|
||||
{
|
||||
@@ -253,11 +253,11 @@ func TestCalculateATR(t *testing.T) {
|
||||
func TestCalculateATR_TrueRange(t *testing.T) {
|
||||
// 创建一个简单的测试用例,手动计算期望的 ATR
|
||||
klines := []Kline{
|
||||
{High: 50.0, Low: 48.0, Close: 49.0}, // TR = 2.0
|
||||
{High: 51.0, Low: 49.0, Close: 50.0}, // TR = max(2.0, 2.0, 1.0) = 2.0
|
||||
{High: 52.0, Low: 50.0, Close: 51.0}, // TR = max(2.0, 2.0, 1.0) = 2.0
|
||||
{High: 53.0, Low: 51.0, Close: 52.0}, // TR = 2.0
|
||||
{High: 54.0, Low: 52.0, Close: 53.0}, // TR = 2.0
|
||||
{High: 50.0, Low: 48.0, Close: 49.0}, // TR = 2.0
|
||||
{High: 51.0, Low: 49.0, Close: 50.0}, // TR = max(2.0, 2.0, 1.0) = 2.0
|
||||
{High: 52.0, Low: 50.0, Close: 51.0}, // TR = max(2.0, 2.0, 1.0) = 2.0
|
||||
{High: 53.0, Low: 51.0, Close: 52.0}, // TR = 2.0
|
||||
{High: 54.0, Low: 52.0, Close: 53.0}, // TR = 2.0
|
||||
}
|
||||
|
||||
atr := calculateATR(klines, 3)
|
||||
@@ -347,3 +347,156 @@ func TestCalculateIntradaySeries_VolumePrecision(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_NormalData tests that normal fluctuating data returns false
|
||||
func TestIsStaleData_NormalData(t *testing.T) {
|
||||
klines := []Kline{
|
||||
{Close: 100.0, Volume: 1000},
|
||||
{Close: 100.5, Volume: 1200},
|
||||
{Close: 99.8, Volume: 900},
|
||||
{Close: 100.2, Volume: 1100},
|
||||
{Close: 100.1, Volume: 950},
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "BTCUSDT")
|
||||
|
||||
if result {
|
||||
t.Error("Expected false for normal fluctuating data, got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_PriceFreezeWithZeroVolume tests that frozen price + zero volume returns true
|
||||
func TestIsStaleData_PriceFreezeWithZeroVolume(t *testing.T) {
|
||||
klines := []Kline{
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "DOGEUSDT")
|
||||
|
||||
if !result {
|
||||
t.Error("Expected true for frozen price + zero volume, got false")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_PriceFreezeWithVolume tests that frozen price but normal volume returns false
|
||||
func TestIsStaleData_PriceFreezeWithVolume(t *testing.T) {
|
||||
klines := []Kline{
|
||||
{Close: 100.0, Volume: 1000},
|
||||
{Close: 100.0, Volume: 1200},
|
||||
{Close: 100.0, Volume: 900},
|
||||
{Close: 100.0, Volume: 1100},
|
||||
{Close: 100.0, Volume: 950},
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "STABLECOIN")
|
||||
|
||||
if result {
|
||||
t.Error("Expected false for frozen price but normal volume (low volatility market), got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_InsufficientData tests that insufficient data (<5 klines) returns false
|
||||
func TestIsStaleData_InsufficientData(t *testing.T) {
|
||||
klines := []Kline{
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "BTCUSDT")
|
||||
|
||||
if result {
|
||||
t.Error("Expected false for insufficient data (<5 klines), got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_ExactlyFiveKlines tests edge case with exactly 5 klines
|
||||
func TestIsStaleData_ExactlyFiveKlines(t *testing.T) {
|
||||
// Stale case: exactly 5 frozen klines with zero volume
|
||||
staleKlines := []Kline{
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
{Close: 100.0, Volume: 0},
|
||||
}
|
||||
|
||||
result := isStaleData(staleKlines, "TESTUSDT")
|
||||
if !result {
|
||||
t.Error("Expected true for exactly 5 frozen klines with zero volume, got false")
|
||||
}
|
||||
|
||||
// Normal case: exactly 5 klines with fluctuation
|
||||
normalKlines := []Kline{
|
||||
{Close: 100.0, Volume: 1000},
|
||||
{Close: 100.1, Volume: 1100},
|
||||
{Close: 99.9, Volume: 900},
|
||||
{Close: 100.0, Volume: 1000},
|
||||
{Close: 100.05, Volume: 950},
|
||||
}
|
||||
|
||||
result = isStaleData(normalKlines, "TESTUSDT")
|
||||
if result {
|
||||
t.Error("Expected false for exactly 5 normal klines, got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_WithinTolerance tests price changes within tolerance (0.01%)
|
||||
func TestIsStaleData_WithinTolerance(t *testing.T) {
|
||||
// Price changes within 0.01% tolerance should be treated as frozen
|
||||
basePrice := 10000.0
|
||||
tolerance := 0.0001 // 0.01%
|
||||
smallChange := basePrice * tolerance * 0.5 // Half of tolerance
|
||||
|
||||
klines := []Kline{
|
||||
{Close: basePrice, Volume: 1000},
|
||||
{Close: basePrice + smallChange, Volume: 1000},
|
||||
{Close: basePrice - smallChange, Volume: 1000},
|
||||
{Close: basePrice, Volume: 1000},
|
||||
{Close: basePrice + smallChange, Volume: 1000},
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "BTCUSDT")
|
||||
|
||||
// Should return false because there's normal volume despite tiny price changes
|
||||
if result {
|
||||
t.Error("Expected false for price within tolerance but with volume, got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_MixedScenario tests realistic scenario with some history before freeze
|
||||
func TestIsStaleData_MixedScenario(t *testing.T) {
|
||||
// Simulate: normal trading → suddenly freezes
|
||||
klines := []Kline{
|
||||
{Close: 100.0, Volume: 1000}, // Normal
|
||||
{Close: 100.5, Volume: 1200}, // Normal
|
||||
{Close: 100.2, Volume: 1100}, // Normal
|
||||
{Close: 50.0, Volume: 0}, // Freeze starts
|
||||
{Close: 50.0, Volume: 0}, // Frozen
|
||||
{Close: 50.0, Volume: 0}, // Frozen
|
||||
{Close: 50.0, Volume: 0}, // Frozen
|
||||
{Close: 50.0, Volume: 0}, // Frozen (last 5 are all frozen)
|
||||
}
|
||||
|
||||
result := isStaleData(klines, "DOGEUSDT")
|
||||
|
||||
// Should detect stale data based on last 5 klines
|
||||
if !result {
|
||||
t.Error("Expected true for frozen last 5 klines with zero volume, got false")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsStaleData_EmptyKlines tests edge case with empty slice
|
||||
func TestIsStaleData_EmptyKlines(t *testing.T) {
|
||||
klines := []Kline{}
|
||||
|
||||
result := isStaleData(klines, "BTCUSDT")
|
||||
|
||||
if result {
|
||||
t.Error("Expected false for empty klines, got true")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user