WebSocket多流Python

juud5qan  于 2022-11-11  发布在  Python
关注(0)|答案(1)|浏览(160)

我的问题是:我正在尝试使用WebSocket获取Binance上三对加密的价格
我的问题是,我很难管理这样一个事实,即有多个流(来自三对),而只有一个流,它工作得非常好。
下面是我的代码:

import websocket, json, pprint, talib, numpy

cc = ["btcusdt","adausdt","solusdt"]
interval = "1m"

for i in cc:
    socket = f'wss://stream.binance.com:9443/ws/{i}@kline_{interval}'

    def on_message(ws, message):
        json_message = json.loads(message)
        candle = json_message['k']
        close = candle["c"]
        print(i,': ',close)

    def on_close(ws):
        print("Connection Closed")

    wsapp = websocket.WebSocketApp(socket, on_message=on_message, on_close=on_close)
    wsapp.run_forever()

这是我每一秒得到的结果:

btcusdt :  47009.01000000
btcusdt :  47009.02000000
btcusdt :  47004.00000000
...

然而,这不是我想要的,它只考虑了第一对。我正在寻找这样的结果:

btcusdt : #price
adausdt : #price
solusdt : #price
btcusdt : #price
adausdt : #price
solusdt : #price
...

我知道我的问题来自于wsapp.run_forever(),因为这样做它只取第一对,而不循环到其他对。但我不知道如何管理它,如果有人有管理这个问题的解决方案,我会很高兴。
非常感谢您的光临

yhqotfr8

yhqotfr81#

我不能通过你的代码得到你想要的,但是我做了一个你需要的,如果它有帮助的话,看看它。再见。

import json
import websocket
import sqlalchemy
import pandas as pd

# websocket.enableTrace(True) #DEBUG

engine = sqlalchemy.create_engine('sqlite:///CESTA_MOEDAS_raw.db') #Chamada SQL
pair_coins = 'ethusdt@kline_1m/btcusdt@kline_1m/bnbusdt@kline_1m/ethbtc@kline_1m'

def on_open(ws):
    print("open")

def on_message(ws, message):
        json_message = json.loads(message)
        candle = json_message['data']['k']
        df = pd.DataFrame([candle])

        #print(df.keys()) # DEBUG: 
        #print(df.values()) # DEBUG:

        df = df.loc[:, ['t', 'T', 's', 'i', 'f', 'L', 'o', 'c', 'h', 'l', 'v', 'n', 'x', 'q', 'V', 'Q', 'B']]  
        df.columns = ['start_time', 'close_time', 'Symbol', 'Interval', ' First_trade_ID', 'Last_trade_ID', 'Open_price', 'Close_price', 'High_price', 'Low_price', 'Base_asset_volume', 'Number_of_trades', 'Candle_close_price', 'Quote_asset_volume', 'Taker_buy_base_asset_volume', 'Taker_buy_quote_asset_volume', 'Ignore']  
        df = df.loc[:, ['close_time', 'Symbol', 'Close_price']]
        df.Close_price = df.Close_price.astype(float)  
        df.close_time = pd.to_datetime(df.close_time, unit='ms')  
        print(df) 

        #Salvando os dados no DbSQL
        frame = df
        frame.to_sql('CESTA_MOEDAS', engine, if_exists='append', index=False)

def on_close(ws, close_status_code, close_msg):
    print("closed")

SOCK = f"wss://stream.binance.com:9443/stream?streams={pair_coins}"
ws = websocket.WebSocketApp(SOCK, on_open=on_open, on_close=on_close, on_message=on_message)

ws.run_forever()

相关问题