feat: implement coinank free kline api and kline websocket (#1281)

- implement coinank free kline api in coinank_api.Kline
- implement coinank free kline ws in coinank_api.KlineWs. if needKline is true, kline data read from KlineCh. if needTicker is true, tickers data  read from TickersCh.
This commit is contained in:
wqqqqqq
2025-12-27 01:54:44 +08:00
committed by GitHub
parent e591ed8226
commit 24cd329f3d
5 changed files with 371 additions and 0 deletions

View File

@@ -0,0 +1,79 @@
package coinank_api
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"nofx/provider/coinank"
"nofx/provider/coinank/coinank_enum"
"strconv"
"time"
)
const MainApiUrl = "https://api.coinank.com"
// Kline open free kline from coinank
func Kline(ctx context.Context, symbol string, exchange coinank_enum.Exchange, ts int64, side coinank_enum.Side, size int,
interval coinank_enum.Interval) ([]coinank.KlineResult, error) {
paramsMap := make(map[string]string, 6)
paramsMap["symbol"] = symbol
paramsMap["exchange"] = string(exchange)
paramsMap["side"] = string(side)
paramsMap["size"] = strconv.Itoa(size)
paramsMap["ts"] = strconv.FormatInt(ts, 10)
paramsMap["interval"] = string(interval)
resp, err := get(ctx, "/api/kline/list/open", paramsMap)
if err != nil {
return nil, err
}
var result coinank.CoinankResponse[[][]float64]
err = json.Unmarshal([]byte(resp), &result)
if err != nil {
return nil, err
}
if !result.Success {
return nil, coinank.HttpError
}
klines := make([]coinank.KlineResult, len(result.Data))
for i, k := range result.Data {
klines[i].StartTime = int64(k[0] + 0.001)
klines[i].EndTime = int64(k[1] + 0.001)
klines[i].Open = k[2]
klines[i].Close = k[3]
klines[i].High = k[4]
klines[i].Low = k[5]
klines[i].Volume = k[6]
klines[i].Quantity = k[7]
klines[i].Count = k[8]
}
return klines, nil
}
func get(ctx context.Context, path string, paramsMap map[string]string) (string, error) {
data := url.Values{}
for key, value := range paramsMap {
data.Add(key, value)
}
fullURL := fmt.Sprintf("%s%s?%s", MainApiUrl, path, data.Encode())
request, err := http.NewRequestWithContext(ctx, "GET", fullURL, nil)
if err != nil {
return "", err
}
resp, err := client.Do(request)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
var client = &http.Client{
Timeout: 30 * time.Second,
}

View File

@@ -0,0 +1,21 @@
package coinank_api
import (
"context"
"encoding/json"
"nofx/provider/coinank/coinank_enum"
"testing"
"time"
)
func TestKline(t *testing.T) {
resp, err := Kline(context.TODO(), "BTCUSDT", coinank_enum.Binance, time.Now().UnixMilli(), coinank_enum.To, 10, coinank_enum.Hour1)
if err != nil {
t.Error(err)
}
res, err := json.Marshal(resp)
if err != nil {
t.Error(err)
}
t.Logf("%s", res)
}

View File

@@ -0,0 +1,203 @@
package coinank_api
import (
"context"
"encoding/json"
"nofx/provider/coinank"
"nofx/provider/coinank/coinank_enum"
"strconv"
"strings"
"golang.org/x/net/websocket"
)
const MainWsUrl = "wss://ws.coinank.com/ws"
type KlineWs struct {
conn *websocket.Conn
KlineCh <-chan *WsResult[coinank.KlineResult]
TickersCh <-chan *WsResult[KlineTickers]
}
// WsConn connect ws , read data from KlineCh and TickersCh
func WsConn(ctx context.Context, needKline bool, needTicker bool) (*KlineWs, error) {
conn, ch, err := ws(ctx)
if err != nil {
return nil, err
}
klineCh, tickersCh := handleResponse(ch, needKline, needTicker)
ws := &KlineWs{
conn: conn,
KlineCh: klineCh,
TickersCh: tickersCh,
}
return ws, nil
}
// Subscribe subscribe kline
func (ws *KlineWs) Subscribe(symbol string, exchange coinank_enum.Exchange, interval coinank_enum.Interval) error {
var args = "kline@" + symbol + "@" + string(exchange) + "@" + string(interval)
info := SubscribeInfo{
Op: "subscribe",
Args: args,
}
json, err := json.Marshal(info)
if err != nil {
return err
}
err = websocket.Message.Send(ws.conn, json)
if err != nil {
return err
}
return nil
}
// UnSubscribe unsubscribe kline
func (ws *KlineWs) UnSubscribe(symbol string, exchange coinank_enum.Exchange, interval coinank_enum.Interval) error {
var args = "kline@" + symbol + "@" + string(exchange) + "@" + string(interval)
info := SubscribeInfo{
Op: "unsubscribe",
Args: args,
}
json, err := json.Marshal(info)
if err != nil {
return err
}
err = websocket.Message.Send(ws.conn, json)
if err != nil {
return err
}
return nil
}
// Close websocket
func (ws *KlineWs) Close() error {
return ws.conn.Close()
}
func ws(ctx context.Context) (*websocket.Conn, <-chan string, error) {
config, err := websocket.NewConfig(MainWsUrl, "http://localhost")
if err != nil {
return nil, nil, err
}
conn, err := config.DialContext(ctx)
if err != nil {
return nil, nil, err
}
ch := make(chan string, 1024)
go read(conn, ch)
return conn, ch, nil
}
func read(conn *websocket.Conn, ch chan string) {
defer conn.Close()
defer close(ch)
for {
var msg string
err := websocket.Message.Receive(conn, &msg)
if err != nil {
return
}
ch <- msg
}
}
func handleResponse(ch <-chan string, needKline bool, needTicker bool) (<-chan *WsResult[coinank.KlineResult], <-chan *WsResult[KlineTickers]) {
klineCh := make(chan *WsResult[coinank.KlineResult], 1024)
tickersCh := make(chan *WsResult[KlineTickers], 1024)
go func() {
if needKline {
defer close(klineCh)
} else {
close(klineCh)
}
if needTicker {
defer close(tickersCh)
} else {
close(tickersCh)
}
for msg := range ch {
if needKline && strings.HasPrefix(msg, "{\"op\":\"push\",\"success\":true,\"args\":\"kline") {
var result WsResult[[]any]
err := json.Unmarshal([]byte(msg), &result)
if err == nil && result.Success {
kline := coinank.KlineResult{}
k := result.Data
kline.StartTime = toInt64(k[0])
kline.EndTime = toInt64(k[1])
kline.Open = toFloat64(k[2])
kline.Close = toFloat64(k[3])
kline.High = toFloat64(k[4])
kline.Low = toFloat64(k[5])
kline.Volume = toFloat64(k[6])
kline.Quantity = toFloat64(k[7])
kline.Count = toFloat64(k[8])
var resp WsResult[coinank.KlineResult]
resp.Success = result.Success
resp.Data = kline
resp.Args = result.Args
resp.Op = result.Op
klineCh <- &resp
}
} else if needTicker && strings.HasPrefix(msg, "{\"op\":\"push\",\"success\":true,\"args\":\"tickers") {
var result WsResult[KlineTickers]
err := json.Unmarshal([]byte(msg), &result)
if err == nil && result.Success {
tickersCh <- &result
}
}
}
}()
return klineCh, tickersCh
}
func toInt64(v any) int64 {
f := toFloat64(v)
return int64(f)
}
func toFloat64(v any) float64 {
if f, ok := v.(float64); ok {
return f
}
if f, ok := v.(string); ok {
s, err := strconv.ParseFloat(f, 64)
if err != nil {
return 0
}
return s
}
return 0
}
type SubscribeInfo struct {
Op string `json:"op"`
Args string `json:"args"`
}
type KlineTickers struct {
OiCcy string `json:"oiCcy"`
OiVol string `json:"oiVol"`
Symbol string `json:"symbol"`
ExchangeName string `json:"exchangeName"`
PriceChange24H string `json:"priceChange24h"`
Low24H string `json:"low24h"`
High24H string `json:"high24h"`
VolCcy24H string `json:"volCcy24h"`
LastPrice string `json:"lastPrice"`
Vol24H string `json:"vol24h"`
Turnover24H string `json:"turnover24h"`
OiUSD string `json:"oiUSD"`
FundingRate string `json:"fundingRate"`
LastOiVol string `json:"lastOiVol"`
MarkPrice string `json:"markPrice"`
BasisRate string `json:"basisRate"`
Basis string `json:"basis"`
}
type WsResult[T any] struct {
Op string `json:"op"`
Success bool `json:"success"`
Args string `json:"args"`
Data T `json:"data"`
}

View File

@@ -0,0 +1,60 @@
package coinank_api
import (
"context"
"encoding/json"
"fmt"
"nofx/provider/coinank/coinank_enum"
"testing"
"time"
)
func TestKlineWs(t *testing.T) {
ctx := context.TODO()
ws, err := WsConn(ctx, true, true)
if err != nil {
t.Fatal(err)
}
go func() {
for tickers := range ws.TickersCh {
msg, err := json.Marshal(tickers)
if err != nil {
fmt.Println("json err:", err)
}
fmt.Println(string(msg))
}
fmt.Println("tickersCh closed")
}()
go func() {
for kline := range ws.KlineCh {
msg, err := json.Marshal(kline)
if err != nil {
fmt.Println("json err:", err)
}
fmt.Println(string(msg))
}
fmt.Println("kline closed")
}()
err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Minute1)
if err != nil {
t.Fatal(err)
}
fmt.Println("sub success")
time.Sleep(10 * time.Second)
err = ws.UnSubscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Minute1)
if err != nil {
t.Fatal(err)
}
fmt.Println("unsub success")
time.Sleep(10 * time.Second)
err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, coinank_enum.Hour1)
if err != nil {
t.Fatal(err)
}
fmt.Println("resub success")
time.Sleep(10 * time.Second)
ws.Close()
fmt.Println("cancel success")
time.Sleep(10 * time.Second)
fmt.Println("all success")
}

View File

@@ -0,0 +1,8 @@
package coinank_enum
type Side string
const (
To Side = "to" //search backward from the time ts
From Side = "from" //search forward from time ts
)