当我从一个线程打开WebSocket连接时,甚至没有从WebSocket中检索latest_price_info,WebSocket不会接收任何消息,它只接收第一条消息,不会接收任何其他消息。下面是我运行的代码:
import threading
import time
from binance_websocket import BinanceWebSocket
lock = threading.Lock()
def manage_websocket(binance_ws):
binance_ws.ws.run_forever()
if __name__ == "__main__":
symbol = "BTCUSDT"
interval = "1m"
binance_ws = BinanceWebSocket(symbol, interval)
websocket_thread = threading.Thread(target=manage_websocket, args=(binance_ws, ))
websocket_thread.start()
print("thread started")
while True:
print("while in icine girdi")
time.sleep(10)
下面是WebSocket代码:
import websocket
import json
import time
import threading
class BinanceWebSocket:
def __init__(self, symbol, interval):
self.symbol = symbol
self.interval = interval
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@kline_{self.interval}"
self.ws = websocket.WebSocketApp(self.ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
self.ws.on_open = self.on_open
self.latest_price_info = None
self.lock = threading.Lock() # Initialize the lock
def set_latest_price_info(self, price):
with self.lock:
self.latest_price_info = price
def get_latest_price_info(self):
with self.lock:
return self.latest_price_info
def on_message(self, ws, message):
print("The message is: ", message)
data = json.loads(message)
'''
kline = data['k']
close = kline['c']
self.set_latest_price_info(close) # Use the instance method to set the latest price
'''
print("Latest current price updated: ", self.get_latest_price_info())
def on_error(self, ws, error):
print(f"Error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket connection closed")
self.reconnect()
def on_open(self, ws):
print("WebSocket connection opened")
subscription_payload = {
"method": "SUBSCRIBE",
"params": [
f"{self.symbol}@kline_{self.interval}"
],
"id": 1
}
self.ws.send(json.dumps(subscription_payload))
#Binance disconnects websocket connections every 24h, therefore reconnecting when disconnected
def reconnect(self):
while True:
try:
print("Reconnecting...")
time.sleep(5) # Delay before reconnecting
self.ws = websocket.WebSocketApp(self.ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
self.ws.on_open = self.on_open
self.ws.run_forever()
except Exception as e:
print(f"Reconnection failed: {e}")
例如,当我在没有线程的情况下打开WebSocket连接时,一切正常,如果我运行以下代码,BinanceWebSocket
的self.latest_price_info
属性将按预期进行更新,因此在主线程中打开WebSocket连接:
from binance_websocket import BinanceWebSocket
import time
symbol = "btcusdt"
interval = "1m" # You can adjust the interval here
ws = BinanceWebSocket(symbol, interval)
ws.ws.run_forever()
try:
while True:
lastPriceFrom_ws = ws.latest_price_info
if lastPriceFrom_ws is not None:
print("Latest Current Price:", lastPriceFrom_ws)
time.sleep(1) # Adjust the sleep time if needed
except KeyboardInterrupt:
print("WebSocket connection stopped.")
最初,我试图从WebSocket对象中检索self.latest_price_info
,我认为一定是出现了死锁情况,即主线程试图读取self.latest_price_info
,而另一个运行内部WebSocket连接的线程试图更新它,因此我开始在WebSocket类中使用锁,但后来我意识到问题是在线程中运行WebSocket连接。我甚至尝试将另一个线程加入到主线程中,但它不起作用,当我运行程序时,WebSocket只是等待,不接收任何消息。它不返回任何错误或以任何其他方式运行。SImply等待消息,但不接收任何消息。我的主要目标是使用WebSocket不断从Binance交易所获取价格数据。所以我需要的是让WebSocket不断更新它的属性latest_price_info
,我想读取值。此外,如果我打开WebSocket连接,它会阻止主线程的执行,这就是为什么我通过在线程中打开WebSocket连接来寻求解决方案。
2条答案
按热度按时间olqngx591#
为了有效地实现你的目标,你不应该使用线程。使用asyncio代替(或者你可以尝试多处理)。线程实际上是一个接一个地运行进程。所以,如果你算上smth,你不能在同一时刻写数据。
zed5wv102#
问题不在于在线程中运行WebSocket连接,而在于符号。Binance WebSocket订阅需要非大写符号,如“btcusdt”而不是“BTcusdt”,这是我在发布问题时遗漏的一个细节。因此,如果使用非大写符号,两种实现都可以工作。