python 我怎样才能获得连接到Kucoin Exchange的WebSocket

dxpyg8gm  于 2022-12-21  发布在  Python
关注(0)|答案(2)|浏览(102)

我怎样才能获得到Kucoin Exchange的WebSocket连接?我正在尝试该代码:

import os
import time, hashlib, requests, base64, hmac, json, websocket, uuid    
now = str(int(time.time() * 1000))
token = "my token"
SOCKET = f'wss://push1-v2.kucoin.net/endpoint?token={token}&connectId={now}'
params = {"type": "subscribe", "topic": "/market/ticker:BTC-USDT", "response": True}

def on_open(ws):
    print('Opened Connection')
    ws.send(json.dumps(params))

def on_close(ws):
    print('Closed Connection')

def on_ping(ws, message):
    pass

def on_pong(ws, message):
    pass

def on_message(ws, message):
    print(message)

def on_error(ws, err):
  print("Got a an error: ", err)

if __name__ == "__main__":
    ws = websocket.WebSocketApp(SOCKET, on_open = on_open, on_close = on_close, on_message = on_message,on_error=on_error, on_ping=on_ping, on_pong=on_pong)
    ws.run_forever(ping_interval=10, ping_timeout=5)

但我得到的信息是:[Errno 11001] getaddrinfo失败可能我使用了错误的端点?我必须使用哪个端点?

laik7k3q

laik7k3q1#

import requests
import json
import hmac
import hashlib
import base64
from urllib.parse import urlencode
import time
import websocket,threading


def get_token(api_key,api_secret,api_passphrase):
    base_uri = 'https://api-futures.kucoin.com'

    def get_headers(method, endpoint):
        now = int(time.time() * 1000)
        str_to_sign = str(now) + method + endpoint
        signature = base64.b64encode(hmac.new(api_secret.encode(), str_to_sign.encode(), hashlib.sha256).digest()).decode()
        passphrase = base64.b64encode(hmac.new(api_secret.encode(), api_passphrase.encode(), hashlib.sha256).digest()).decode()
        return {'KC-API-KEY': api_key,
                'KC-API-KEY-VERSION': '2',
                'KC-API-PASSPHRASE': passphrase,
                'KC-API-SIGN': signature,
                'KC-API-TIMESTAMP': str(now)}

    req = requests.post(f"{base_uri}/api/v1/bullet-private",headers=get_headers("POST","/api/v1/bullet-private"))
    print(req.json())
    return(req.json())


def RunKucoinSocket(account):        
    def connect_websocket(account):
        now = str(int(time.time() * 1000))
        Data = get_token(account['apikey'],account['apisecret'],account['passphrase'])['data']
        server = Data['instanceServers'][0]
        ws = websocket.WebSocketApp(f"{server['endpoint']}?token={Data['token']}&connectId={now}",
                                    on_message = on_message,
                                    on_error = on_error,
                                    on_close = on_close,
                                    on_open = on_open,
                                    on_ping = on_ping,
                                    on_pong = on_pong
                                    )
        ws.run_forever(ping_interval=15,ping_timeout=10)

    def on_open(ws):
        print('Opened Connection')
        params = {"type": "subscribe", "topic": "/contractMarket/tradeOrders", "response": True, "privateChannel":True}
        ws.send(json.dumps(params))

    def on_close(ws,closemsg,closeerror):
        print('Closed Connection')

    def on_ping(ws, message):
        pass

    def on_pong(ws, message):
        pass

    def on_message(ws, message):
        print(message)

    def on_error(ws, err):
        print("Got a an error: ", err)

    connect_websocket(account)
if __name__ == "__main__":
    data = {
        "username":"Nate",
        "exchange":"Kucoin",
        "apikey":'KEY',
        "apisecret":"SECRET",
        "passphrase":"password"
    }
    RunKucoinSocket(data)
    time.sleep(1000)

我希望这有帮助-它的格式奇怪,因为我用它的规模,但它应该做你想要的。

yc0p9oo0

yc0p9oo02#

只需将此项目(push1-v2.kucoin.net)更改为(ws-api.kucoin.com)即可

相关问题