python-3.x 如何在www.example.com中循环任务discord.py

8tntrjer  于 2022-12-05  发布在  Python
关注(0)|答案(4)|浏览(211)

我正在尝试制作我自己的小discord机器人,它可以从Twitch获取信息,但我对如何让机器人循环和检查条件感到困惑。
我希望机器人每隔几秒钟循环一段代码,检查指定的抽搐通道是否处于活动状态。

代码

import discord
from discord.ext import commands, tasks
from twitch import TwitchClient
from pprint import pformat

client = TwitchClient(client_id='<twitch token>')

bot = commands.Bot(command_prefix='$')

@bot.event
async def on_ready():
    print('We have logged in as {0.user}'.format(bot))

@bot.command()
async def info(ctx, username):
    response = await ctx.send("Querying twitch database...")
    try:
        users = client.users.translate_usernames_to_ids(username)
        for user in users:
            print(user.id)
            userid = user.id
        twitchinfo = client.users.get_by_id(userid)
        status = client.streams.get_stream_by_user(userid)
        if status == None:
            print("Not live")
            livestat = twitchinfo.display_name + "is not live"
        else:
            livestat = twitchinfo.display_name + " is " + status.stream_type
        responsemsg = pformat(twitchinfo) + "\n" + livestat
        await response.edit(content=responsemsg)
    except:
        await response.edit(content="Invalid username")

bot.run("<discord token>")

例如,我希望机器人每10秒运行一次以下代码:

status = client.streams.get_stream_by_user(<channel id>)
if status == None:
     print("Not live")
     livestat = twitchinfo.display_name + "is not live"
else:
     livestat = twitchinfo.display_name + " is " + status.stream_type

我试过使用@tasks.loop(seconds=10)来尝试每10秒重复一次自定义async def,但似乎不起作用。
有什么想法吗?

igetnqfo

igetnqfo1#

discord.py的较新版本不支持client.command()
为了达到同样的效果,我使用了以下代码片段
第一个

wwwo4jvm

wwwo4jvm2#

可以这样做:

async def my_task(ctx, username):
    while True:
        # do something
        await asyncio.sleep(10)

@client.command()
async def info(ctx, username):
    client.loop.create_task(my_task(ctx, username))

参考资料:

  • asyncio.create_task()
  • asyncio.sleep()
gblwokeq

gblwokeq3#

这是实现后台任务的最适当的方法。

from discord.ext import commands, tasks

bot = commands.Bot(...)

@bot.listen()
async def on_ready():
    task_loop.start() # important to start the loop

@tasks.loop(seconds=10)
async def task_loop():
    ... # this code will be executed every 10 seconds after the bot is ready

有关详细信息,请检查this

z9ju0rcb

z9ju0rcb4#

我也很纠结这个问题。我遇到的问题是网上的例子都不完整。下面是我想出的一个使用@tasks.loop(seconds=10)的例子。

import discord
import os
from discord.ext import tasks
from dotenv import load_dotenv

intents = discord.Intents.all()
client = discord.Client(command_prefix="!", intents=intents)

load_dotenv()
token = os.getenv("DISCORD_TOKEN")
CHANNEL_ID = 1234

@client.event
async def on_ready():
    print(f"We have logged in as {client.user}")
    myloop.start()

@client.event
async def on_message(message):
    if message.author == client.user: 
        return

    if message.content.startswith("hi"):
        await message.channel.send("Hello!")

@tasks.loop(seconds=10)
async def myloop():
    channel = client.get_channel(CHANNEL_ID)
    await channel.send("Message")

client.run(token)

相关问题