我目前在discord.py bot和另一个线程之间遇到了一个问题,我需要创建一个钩子来调用类中的函数,比如webhook,但是在两个线程之间,我的代码的一个简单版本是:
import serial
import threading
class bot: # this class is constantly running other functions, can't get halted by 'usb.readline()' or the connection to the server will be closed
async def on_button_pressed(self, button_pressed): # the goal is to make this function run every time the side_thread detects a button press
if button_pressed == 0:
await channel.connect() # if button 0 is pressed, bot connects to a voice channel
elif button_pressed == 1:
await voice.disconnect() # if button 1 is pressed, bot disconnects from voice channel
def side_thread(): # reads the information an microcontroller send to pc
usb = serial.Serial('com4', baudrate=115200)
while True:
uncoded_data = usb.readline() # program waits a response from usb here
data_str = str(uncoded_data[0:len(uncoded_data)].decode('utf-8'))
button_pressed = int(data_str[:1]) # button_pressed can be 0, 1, 2, 3 or 4 depending on what button was pressed
print(f'button {button_pressed} was pressed')
# the problem is here, idk how to create a hook to call on_button_pressed and pass the arguments since it is in another thread,
# already tried putting asyncio.run(bot.on_button_pressed(button_pressed)) here but it doesn't work since it isn't the main thread running the function
t1 = threading.Thread(target=side_thread)
t1.start()
bot = bot()
我试着在side\u线程中放置on\u button\u pressed函数,但是bot没有连接,可能是因为异步函数不是线程安全的。
我的目标是,每当我按下一个按钮时,它会自动运行on\u button\u pressed函数作为主线程,而无需停止它,也无需不断检查变量(同时停止代码或使按钮极为延迟)
为了更好地解释,为了模拟按下按钮,您可以用以下替代side\u线程:
import random
import time
def side_thread(): # reads the information an microcontroller send to pc
while True:
button_pressed = random.randint(0, 4)
time.sleep(5) # simulates a random button press every 5 seconds
print(f'button {button_pressed} was pressed')
我希望每次有一个按钮按下它都会运行bot.on\u button\u pressed作为主线程。要检查这一点,可以在async func上打印线程的名称,它应该与init方法中的名称相同:
class bot: # can't get halted by a while loop or a lock
def __init__(self):
print(threading.current_thread().name) # prints 'MainThread'
async def on_button_pressed(self, button_pressed): # goal is to make this function run every time the side_thread detects a button press
print(threading.current_thread().name) # it should also print the same as the __init__, if it prints differently it isn't working
例如,如果我加上 asyncio.run(bot.on_button_pressed(button_pressed))
到side_线程的末尾,bot类上的两个打印将不同,因此无法工作
暂无答案!
目前还没有任何答案,快来回答吧!