试图在replit创建一个机器人;
管理面试,可以:
1.在第一次开始,创建一个字典中的所有成员在组中,以防止作弊的用户谁改变他们的用户名,设置一个字段为以前的采访与默认值0.在下一次开始,参考此字典来统计民意调查的React.连接到谷歌表
1.开始时,询问面试创建者以下选项:
投票文本,投票时限,发送个人资料的时限,最低以前的采访。
1.根据输入创建投票,提供投票的文本和时间限制,具有以下选项:是、否、导师投票检查。
1.在时间限制后,从“是”响应中,选择一个随机响应者,该响应者的先前访谈次数等于或小于所提供的输入,从已创建的字典中进行比较。如果没有响应者匹配,则随机选择。
1.标记选定的用户在发送配置文件的时间限制内发送“配置文件简报”,发送文件后,向投票创建者和回答是机器人,一旦回答是,该用户以前的采访加1。
1.如果所选用户未回复“是”,请返回步骤4。如果第二个所选用户也未回复,请填写:所选受访者未发送个人资料,面试取消。
`
import logging
import random
try:
from telegram.ext import Update, Updater, CommandHandler, CallbackContext , PollHandler
except ImportError:
import os
os.system("pip install python-telegram-bot --upgrade")
import gspread
from oauth2client.service_account import ServiceAccountCredentials
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Load Google Sheets credentials
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
creds = ServiceAccountCredentials.from_json_keyfile_name("*************.json", scope)
client = gspread.authorize(creds)
# Open the Google Sheet by title
sheet_title = "*****"
sheet = client.open(sheet_title).sheet1
# Dictionary to store members and their previous interviews count
members_dict = {}
# Load data from Google Sheet into members_dict at the beginning of your script
def load_from_sheet():
global members_dict
members_dict = {}
data = sheet.get_all_records()
for row in data:
user_id = row["User ID"]
prev_interviews = row["Previous Interviews"]
members_dict[user_id] = prev_interviews
# Function to update the Google Sheet with the members_dict
def update_sheet():
# Get existing data from the sheet
existing_data = sheet.get_all_records()
# Update data in the existing_data list
for user_id, prev_interviews in members_dict.items():
# Check if the user_id is already in the sheet
user_row = next((row for row in existing_data if row["User ID"] == user_id), None)
if user_row:
# Update the existing row
user_row["Previous Interviews"] = prev_interviews
else:
# Add a new row
existing_data.append({"User ID": user_id, "Previous Interviews": prev_interviews})
# Clear existing data in the sheet
sheet.clear()
# Write header
header = ["User ID", "Previous Interviews"]
sheet.append_row(header)
# Write modified data
for row in existing_data:
sheet.append_row([row["User ID"], row["Previous Interviews"]])
# Callback function for handling the /start command
def start(update: Update, context: CallbackContext) -> None:
update.message.reply_text("Welcome to the interview bot! Please provide the following options:\n"
"/setoptions <poll_text> <poll_time_limit> <profile_time_limit> <min_previous_interviews>")
# Callback function for handling the /setoptions command
def set_options(update: Update, context: CallbackContext) -> None:
args = context.args
if len(args) != 4:
update.message.reply_text("Usage: /setoptions <poll_text> <poll_time_limit> <profile_time_limit> <min_previous_interviews>")
return
poll_text, poll_time_limit, profile_time_limit, min_previous_interviews = args
context.user_data['poll_text'] = poll_text
context.user_data['poll_time_limit'] = int(poll_time_limit) * 60 # Convert minutes to seconds
context.user_data['profile_time_limit'] = int(profile_time_limit) * 60 # Convert minutes to seconds
context.user_data['min_previous_interviews'] = int(min_previous_interviews)
update.message.reply_text("Options set successfully!")
# Callback function for handling the /interview command
def interview(update: Update, context: CallbackContext) -> None:
if 'poll_text' not in context.user_data:
update.message.reply_text("Please set options using /setoptions command first.")
return
# Create a poll
poll_message = update.message.reply_poll(
context.user_data['poll_text'],
options=["Yes", "No", "Mentor Poll Check"],
type=PollHandler.ANSWER_OPTIONS,
is_anonymous=False,
allows_multiple_answers=False,
open_period=context.user_data['poll_time_limit']
)
# Get the poll ID
poll_id = poll_message.poll.id
# Set a timer to check the poll results
context.job_queue.run_once(check_poll_results, context.user_data['poll_time_limit'], context=poll_id)
# Display time limit
update.message.reply_text(f"Poll created with time limit: {context.user_data['poll_time_limit'] / 60} minutes.")
# Callback function to check poll results
def check_poll_results(context: CallbackContext) -> None:
poll_id = context.job.context
poll_results = context.bot.stop_poll(chat_id=context.job.context, message_id=poll_id)
# Process poll results
process_poll_results(context.bot, context.user_data, poll_results)
# Function to process poll results
def process_poll_results(bot, user_data, poll_results):
yes_responses = poll_results['options'][0]['voter_count']
if yes_responses == 0:
update.message.reply_text("No one responded 'Yes' to the poll. The interview is cancelled.")
return
# Filter members with previous interviews less than or equal to the specified limit
eligible_members = [user_id for user_id, prev_interviews in members_dict.items() if prev_interviews <= user_data['min_previous_interviews']]
if not eligible_members:
selected_user_id = random.choice(list(members_dict.keys()))
else:
selected_user_id = random.choice(eligible_members)
selected_user = bot.get_chat_member(chat_id=context.chat_data['chat_id'], user_id=selected_user_id).user
# Notify the selected user to send a profile
bot.send_message(selected_user_id, f"You've been selected for an interview! Please send your profile brief within {user_data['profile_time_limit'] / 60} minutes. Reply with 'yes' once done.")
# Set a timer to check if the user replies 'yes' in time
context.job_queue.run_once(check_profile_submission, user_data['profile_time_limit'], context=(selected_user_id, user_data))
# Callback function to check profile submission
def check_profile_submission(context: CallbackContext) -> None:
selected_user_id, user_data = context.job.context
if members_dict[selected_user_id] == 0:
# Notify the poll creator about the selected user's failure to reply 'yes'
context.bot.send_message(context.chat_data['chat_id'], f"Selected user @{selected_user_id} did not reply 'yes'.")
# Retry the process with another user
process_poll_results(context.bot, user_data, context.chat_data)
else:
# Notify the poll creator about the successful profile submission
context.bot.send_message(context.chat_data['chat_id'], f"Profile received from @{selected_user_id}.")
# Increment the previous interviews count for the selected user
members_dict[selected_user_id] += 1
# Update Google Sheet
update_sheet()
# Callback function for handling the /stop command
def stop(update: Update, context: CallbackContext) -> None:
update.message.reply_text("Bot is stopping...")
context.job_queue.stop()
context.bot.stop()
# Main function to start the bot
def main() -> None:
# Set your Telegram Bot Token
updater = Updater("**********")
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Register command handlers
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("setoptions", set_options))
dp.add_handler(CommandHandler("interview", interview))
dp.add_handler(CommandHandler("stop", stop)) # Added /stop command handler
# Start the Bot
updater.start_polling()
# Run the bot until you send a signal to stop it
updater.idle()
if __name__ == '__main__':
main()
字符串
获取错误,
第一个月
//之前尝试更改导入语句时出现错误,但无法理解错误。
2条答案
按热度按时间rjjhvcjd1#
在start函数中有一个名为Update的参数,这与telegram.ext模块中的Update类冲突。
更改此:
字符串
对此:
型
您需要更新函数定义中的参数名称以及函数中的任何引用。对于遇到类似问题的其他函数定义重复此操作。
yc0p9oo02#
看看你的街区:
字符串
在ImportError的情况下,这些变量将不会被定义为
Update, Updater, CommandHandler, CallbackContext , PollHandler
。看起来这是发生了,一些ImportError发生了,但你没有充分处理它,留下变量未定义。
在我看来,你根本不应该尝试处理ImportError,因为你需要
Updater
类。否则,你可以按照Michael Butscher的评论,不计算类型提示。此外,从脚本内部安装软件包是不好的做法。您可以在except块的末尾添加
from telegram.ext import Update, Updater, CommandHandler, CallbackContext , PollHandler
以尝试第二次导入。TL;DR:提供一个带有适当软件包版本的需求文件,并假设所有内容都已提供,不要尝试动态安装。