postgresql 使用psycopg3创建异步连接

ddrv8njm  于 2023-01-13  发布在  PostgreSQL
关注(0)|答案(1)|浏览(379)

我正在尝试使用psycopg3创建一个异步连接。我使用的是没有异步的psycopg2,需要转移到异步数据库函数。文档没有给予太多信息。
这就是我在psycopg2上用的。效果很好。

con = psycopg2.connect(host="HOSTNAME", port="PORT", database=("DATABASE", user="USER", password="PASSWORD")
cursor = con.cursor()

然后,当我需要运行查询时,我只需使用

cursor.execute(query, params)
cursor.fetchall() # or con.commit() depending on insert or select statement.

现在我将转向异步函数,我已经尝试了以下方法

con = await psycopg.AsyncConnection.connect(host="HOSTNAME", port="PORT", database="DATABASE", user="USER", password="PASSWORD")
cursor = await con.cursor()

但是我得到了一个错误,我不能在函数之外使用wait。
医生让我这么做的

async with await psycopg.AsyncConnection.connect() as aconn:
    async with aconn.cursor() as cur:
        await cur.execute(...)

那么我需要在每个函数中写这个吗,我想用它来读写记录?
我的代码中有几个例子正在使用psycopg2

async def check_guild(guild_id):
    cursor.execute("SELECT guild_id, guild_name, su_id FROM guild WHERE guild_id = %s", [guild_id])
    guild = cursor.fetchone()
    return guild

async def config_raffle(guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee):
    try:
        cursor.execute("""INSERT INTO raffle_config (guild_id, channel_id, channel_name, channel_cat_id, token, default_token, default_address, su_id, fee) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (guild_id) DO UPDATE SET channel_id = EXCLUDED.channel_id, channel_name = EXCLUDED.channel_name, channel_cat_id = EXCLUDED.channel_cat_id, token = EXCLUDED.token,
        default_token = EXCLUDED.default_token, default_address = EXCLUDED.default_address, su_id = EXCLUDED.su_id, fee = EXCLUDED.fee""",
                       (guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee))
        con.commit()
    except:
        logging.exception("Exception", exc_info=True)
        con.rollback()
        print("Error: 25")
    return True

所以我想也许我更好的选择是使用AsyncConnectionPool。我有一个db.py文件设置如下:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)

async def open_pool():
    await pool.open()

当我的程序运行on_ready函数时,我打开了这个池。我用这种方法创建了新的表,但是当我试图检索记录时,我得到了这个错误。

discord.ext.commands.errors.CommandInvokeError: Command raised an exception: AttributeError: 'AsyncConnection' object has no attribute 'fetchone'
g2ieeal7

g2ieeal71#

最后是这样解决的:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)

async def open_pool():
    await pool.open()
    await pool.wait()
    print("Connection Pool Opened")

async def select_fetchall(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            results = await cursor.fetchall()
            return results

async def write(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            if 'RETURNING' in query:
                results = await cursor.fetchone()
                return results
            else:
                return

然后,当需要读取或写入数据库时,我只需调用这些函数并传递查询和参数。

相关问题