diff --git a/provider/coinank/coinank_api/kline.go b/provider/coinank/coinank_api/kline.go new file mode 100644 index 00000000..b17e1938 --- /dev/null +++ b/provider/coinank/coinank_api/kline.go @@ -0,0 +1,79 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "nofx/provider/coinank" + "nofx/provider/coinank/coinank_enum" + "strconv" + "time" +) + +const MainApiUrl = "https://api.coinank.com" + +// Kline open free kline from coinank +func Kline(ctx context.Context, symbol string, exchange coinank_enum.Exchange, ts int64, side coinank_enum.Side, size int, + interval coinank_enum.Interval) ([]coinank.KlineResult, error) { + paramsMap := make(map[string]string, 6) + paramsMap["symbol"] = symbol + paramsMap["exchange"] = string(exchange) + paramsMap["side"] = string(side) + paramsMap["size"] = strconv.Itoa(size) + paramsMap["ts"] = strconv.FormatInt(ts, 10) + paramsMap["interval"] = string(interval) + resp, err := get(ctx, "/api/kline/list/open", paramsMap) + if err != nil { + return nil, err + } + var result coinank.CoinankResponse[[][]float64] + err = json.Unmarshal([]byte(resp), &result) + if err != nil { + return nil, err + } + if !result.Success { + return nil, coinank.HttpError + } + klines := make([]coinank.KlineResult, len(result.Data)) + for i, k := range result.Data { + klines[i].StartTime = int64(k[0] + 0.001) + klines[i].EndTime = int64(k[1] + 0.001) + klines[i].Open = k[2] + klines[i].Close = k[3] + klines[i].High = k[4] + klines[i].Low = k[5] + klines[i].Volume = k[6] + klines[i].Quantity = k[7] + klines[i].Count = k[8] + } + return klines, nil +} + +func get(ctx context.Context, path string, paramsMap map[string]string) (string, error) { + data := url.Values{} + for key, value := range paramsMap { + data.Add(key, value) + } + fullURL := fmt.Sprintf("%s%s?%s", MainApiUrl, path, data.Encode()) + request, err := http.NewRequestWithContext(ctx, "GET", fullURL, nil) + if err != nil { + return "", err + } + resp, err := client.Do(request) + if err != nil { + return "", err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} + +var client = &http.Client{ + Timeout: 30 * time.Second, +} diff --git a/provider/coinank/coinank_api/kline_test.go b/provider/coinank/coinank_api/kline_test.go new file mode 100644 index 00000000..07c66e40 --- /dev/null +++ b/provider/coinank/coinank_api/kline_test.go @@ -0,0 +1,21 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "nofx/provider/coinank/coinank_enum" + "testing" + "time" +) + +func TestKline(t *testing.T) { + resp, err := Kline(context.TODO(), "BTCUSDT", coinank_enum.Binance, time.Now().UnixMilli(), coinank_enum.To, 10, coinank_enum.Hour1) + if err != nil { + t.Error(err) + } + res, err := json.Marshal(resp) + if err != nil { + t.Error(err) + } + t.Logf("%s", res) +} diff --git a/provider/coinank/coinank_api/kline_ws.go b/provider/coinank/coinank_api/kline_ws.go new file mode 100644 index 00000000..7702cd10 --- /dev/null +++ b/provider/coinank/coinank_api/kline_ws.go @@ -0,0 +1,203 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "nofx/provider/coinank" + "nofx/provider/coinank/coinank_enum" + "strconv" + "strings" + + "golang.org/x/net/websocket" +) + +const MainWsUrl = "wss://ws.coinank.com/ws" + +type KlineWs struct { + conn *websocket.Conn + KlineCh <-chan *WsResult[coinank.KlineResult] + TickersCh <-chan *WsResult[KlineTickers] +} + +// WsConn connect ws , read data from KlineCh and TickersCh +func WsConn(ctx context.Context, needKline bool, needTicker bool) (*KlineWs, error) { + conn, ch, err := ws(ctx) + if err != nil { + return nil, err + } + klineCh, tickersCh := handleResponse(ch, needKline, needTicker) + ws := &KlineWs{ + conn: conn, + KlineCh: klineCh, + TickersCh: tickersCh, + } + return ws, nil +} + +// Subscribe subscribe kline +func (ws *KlineWs) Subscribe(symbol string, exchange coinank_enum.Exchange, interval coinank_enum.Interval) error { + var args = "kline@" + symbol + "@" + string(exchange) + "@" + string(interval) + info := SubscribeInfo{ + Op: "subscribe", + Args: args, + } + json, err := json.Marshal(info) + if err != nil { + return err + } + err = websocket.Message.Send(ws.conn, json) + if err != nil { + return err + } + return nil +} + +// UnSubscribe unsubscribe kline +func (ws *KlineWs) UnSubscribe(symbol string, exchange coinank_enum.Exchange, interval coinank_enum.Interval) error { + var args = "kline@" + symbol + "@" + string(exchange) + "@" + string(interval) + info := SubscribeInfo{ + Op: "unsubscribe", + Args: args, + } + json, err := json.Marshal(info) + if err != nil { + return err + } + err = websocket.Message.Send(ws.conn, json) + if err != nil { + return err + } + return nil +} + +// Close websocket +func (ws *KlineWs) Close() error { + return ws.conn.Close() +} + +func ws(ctx context.Context) (*websocket.Conn, <-chan string, error) { + config, err := websocket.NewConfig(MainWsUrl, "http://localhost") + if err != nil { + return nil, nil, err + } + conn, err := config.DialContext(ctx) + if err != nil { + return nil, nil, err + } + ch := make(chan string, 1024) + go read(conn, ch) + return conn, ch, nil +} + +func read(conn *websocket.Conn, ch chan string) { + defer conn.Close() + defer close(ch) + for { + var msg string + err := websocket.Message.Receive(conn, &msg) + if err != nil { + return + } + ch <- msg + } +} + +func handleResponse(ch <-chan string, needKline bool, needTicker bool) (<-chan *WsResult[coinank.KlineResult], <-chan *WsResult[KlineTickers]) { + klineCh := make(chan *WsResult[coinank.KlineResult], 1024) + tickersCh := make(chan *WsResult[KlineTickers], 1024) + go func() { + if needKline { + defer close(klineCh) + } else { + close(klineCh) + } + if needTicker { + defer close(tickersCh) + } else { + close(tickersCh) + } + for msg := range ch { + if needKline && strings.HasPrefix(msg, "{\"op\":\"push\",\"success\":true,\"args\":\"kline") { + var result WsResult[[]any] + err := json.Unmarshal([]byte(msg), &result) + if err == nil && result.Success { + kline := coinank.KlineResult{} + k := result.Data + kline.StartTime = toInt64(k[0]) + kline.EndTime = toInt64(k[1]) + kline.Open = toFloat64(k[2]) + kline.Close = toFloat64(k[3]) + kline.High = toFloat64(k[4]) + kline.Low = toFloat64(k[5]) + kline.Volume = toFloat64(k[6]) + kline.Quantity = toFloat64(k[7]) + kline.Count = toFloat64(k[8]) + var resp WsResult[coinank.KlineResult] + resp.Success = result.Success + resp.Data = kline + resp.Args = result.Args + resp.Op = result.Op + klineCh <- &resp + } + } else if needTicker && strings.HasPrefix(msg, "{\"op\":\"push\",\"success\":true,\"args\":\"tickers") { + var result WsResult[KlineTickers] + err := json.Unmarshal([]byte(msg), &result) + if err == nil && result.Success { + tickersCh <- &result + } + } + } + }() + return klineCh, tickersCh +} + +func toInt64(v any) int64 { + f := toFloat64(v) + return int64(f) +} + +func toFloat64(v any) float64 { + if f, ok := v.(float64); ok { + return f + } + if f, ok := v.(string); ok { + s, err := strconv.ParseFloat(f, 64) + if err != nil { + return 0 + } + return s + } + return 0 +} + +type SubscribeInfo struct { + Op string `json:"op"` + Args string `json:"args"` +} + +type KlineTickers struct { + OiCcy string `json:"oiCcy"` + OiVol string `json:"oiVol"` + Symbol string `json:"symbol"` + ExchangeName string `json:"exchangeName"` + PriceChange24H string `json:"priceChange24h"` + Low24H string `json:"low24h"` + High24H string `json:"high24h"` + VolCcy24H string `json:"volCcy24h"` + LastPrice string `json:"lastPrice"` + Vol24H string `json:"vol24h"` + Turnover24H string `json:"turnover24h"` + OiUSD string `json:"oiUSD"` + FundingRate string `json:"fundingRate"` + LastOiVol string `json:"lastOiVol"` + MarkPrice string `json:"markPrice"` + BasisRate string `json:"basisRate"` + Basis string `json:"basis"` +} + +type WsResult[T any] struct { + Op string `json:"op"` + Success bool `json:"success"` + Args string `json:"args"` + Data T `json:"data"` +} diff --git a/provider/coinank/coinank_api/klinw_ws_test.go b/provider/coinank/coinank_api/klinw_ws_test.go new file mode 100644 index 00000000..50749180 --- /dev/null +++ b/provider/coinank/coinank_api/klinw_ws_test.go @@ -0,0 +1,60 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "fmt" + "nofx/provider/coinank/coinank_enum" + "testing" + "time" +) + +func TestKlineWs(t *testing.T) { + ctx := context.TODO() + ws, err := WsConn(ctx, true, true) + if err != nil { + t.Fatal(err) + } + go func() { + for tickers := range ws.TickersCh { + msg, err := json.Marshal(tickers) + if err != nil { + fmt.Println("json err:", err) + } + fmt.Println(string(msg)) + } + fmt.Println("tickersCh closed") + }() + go func() { + for kline := range ws.KlineCh { + msg, err := json.Marshal(kline) + if err != nil { + fmt.Println("json err:", err) + } + fmt.Println(string(msg)) + } + fmt.Println("kline closed") + }() + err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Minute1) + if err != nil { + t.Fatal(err) + } + fmt.Println("sub success") + time.Sleep(10 * time.Second) + err = ws.UnSubscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Minute1) + if err != nil { + t.Fatal(err) + } + fmt.Println("unsub success") + time.Sleep(10 * time.Second) + err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Hour1) + if err != nil { + t.Fatal(err) + } + fmt.Println("resub success") + time.Sleep(10 * time.Second) + ws.Close() + fmt.Println("cancel success") + time.Sleep(10 * time.Second) + fmt.Println("all success") +} diff --git a/provider/coinank/coinank_enum/side.go b/provider/coinank/coinank_enum/side.go new file mode 100644 index 00000000..99edb01e --- /dev/null +++ b/provider/coinank/coinank_enum/side.go @@ -0,0 +1,8 @@ +package coinank_enum + +type Side string + +const ( + To Side = "to" //search backward from the time ts + From Side = "from" //search forward from time ts +)